Skip to content

Conversation

@ajithme
Copy link
Contributor

@ajithme ajithme commented Apr 23, 2019

What changes were proposed in this pull request?

DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted events has to be processed as DAGSchedulerEventProcessLoop is single threaded and it will block other tasks in queue like TaskCompletion.
The JobSubmitted event is time consuming depending on the nature of the job (Example: calculating parent stage dependencies, shuffle dependencies, partitions) and thus it blocks all the events to be processed.

Similarly in my cluster some jobs partition calculation is time consuming (Similar to stack at SPARK-2647) hence it slows down the spark DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if its tasks are finished within seconds, as TaskCompletion Events are processed at a slower rate due to blockage.

Refer: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Scheduler-Spark-DAGScheduler-scheduling-performance-hindered-on-JobSubmitted-Event-td23562.html

I see multiple JIRA referring to this behavior
https://issues.apache.org/jira/browse/SPARK-2647
https://issues.apache.org/jira/browse/SPARK-4961

Forcing partition evaluation before submitting job can help in mitigation

How was this patch tested?

  1. Added UT
  2. Manual test to verify blockage before and after applying patch.

@ajithme
Copy link
Contributor Author

ajithme commented Apr 23, 2019

@squito @srowen @shivaram @markhamstra @HyukjinKwon

After getting comments from #20770 have checked for a narrower fix. Please review, do materialising partitions outside DAGEventLoop have any side effects.?

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just discussing the ramifications of this w/out any opinion:

Moving rdd.partitions to the user-thread that submits the job, outside of the dagscheduler event loop, is interesting. Its not exposing internal scheduler state that needs to be protected inside the dag scheduler event loop -- OTOH rdd.partitions was implicitly protected before, this could expose user code which was doing something that is not thread safe in their custom partitioner. So maybe _partitions should be protected by a synchronized.

That said, spark already hasn't really had that protection in place -- _partitions is already accessed by other methods. eg. sortByKey() and doCheckpoint() outside of the dagscheduler event loop.

more opinion: I'm kinda ambivalent. might be OK, but I feel like it requires touching some old things (always a bit dangerous) for what seems like a niche case. agree that if we want to do this, it doesn't really make sense to have it behind a conf (other than perhaps an undocumented "escape-hatch").

@ajithme
Copy link
Contributor Author

ajithme commented May 2, 2019

@squito thanks for the feedback and pointers.

  1. Will remove the configuration, even @srowen had similar opinion about having new configuration
  2. Will check for synchronized _partitions handling.

@ajithme
Copy link
Contributor Author

ajithme commented May 3, 2019

@squito i have handled the review comments and updated the PR. Please review.

I have one question about org.apache.spark.rdd.RDD#partitions where the internal data-structure is returned, should this be changed to return a copy instead.?

@ajithme ajithme force-pushed the dagsche branch 2 times, most recently from b2382ca to 5cb86d5 Compare June 1, 2019 05:11
@ajithme
Copy link
Contributor Author

ajithme commented Jun 1, 2019

@squito @srowen had some rebase due to upstream conflicts, please review

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 31, 2019
@github-actions github-actions bot closed this Jan 1, 2020
JoshRosen added a commit that referenced this pull request Oct 14, 2021
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes #34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
JoshRosen added a commit that referenced this pull request Oct 14, 2021
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes #34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
JoshRosen added a commit that referenced this pull request Oct 14, 2021
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes #34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
JoshRosen added a commit that referenced this pull request Oct 14, 2021
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes #34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
sunchao pushed a commit to sunchao/spark that referenced this pull request Dec 8, 2021
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (apache#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes apache#34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit 01ee46e)
Signed-off-by: Dongjoon Hyun <[email protected]>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (apache#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes apache#34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
catalinii pushed a commit to lyft/spark that referenced this pull request Feb 22, 2022
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (apache#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes apache#34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
catalinii pushed a commit to lyft/spark that referenced this pull request Mar 4, 2022
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (apache#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes apache#34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants