Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Oct 12, 2021

What changes were proposed in this pull request?

This PR fixes a longstanding issue where the DAGScheduler's single-threaded event processing loop could become blocked by slow RDD.getPartitions() calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call .partitions on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive getPartitions() calls occur outside of the scheduler event loop.

Background

The RDD.partitions method lazily computes an RDD's partitions by calling RDD.getPartitions(). The getPartitions() method is invoked only once per RDD and its result is cached in the RDD.partitions_ private field. Sometimes the getPartitions() call can be expensive: for example, HadoopRDD.getPartitions() performs file listing operations.

The .partitions method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's often the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, submitJob calls rdd.partitions.length, so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where partitions gets evaluated for the first time inside of the DAGScheduler internals. For example, ShuffledRDD.getPartitions doesn't call .partitions on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in .partitions having been called on all RDDs prior to scheduler job submission.

Correctness: proving that we make no excess .partitions calls

This PR adds code to traverse the DAG prior to job submission and call .partitions on every RDD encountered.

I'd like to argue that this results in no excess .partitions calls: in every case where the new code calls .partitions there is existing code which would have called .partitions at some point during a successful job execution:

  • Assume that this is the first time we are computing every RDD in the DAG.
  • Every RDD appears in some stage.
  • submitStage will call submitMissingTasks on every stage root RDD.
  • submitStage calls getPreferredLocsInternal on every stage root RDD.
  • getPreferredLocsInternal visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls .partitions on each RDD visited.
  • Therefore .partitions is invoked on every RDD in the DAG by the time the job has successfully completed.
  • Therefore this patch's change does not introduce any new calls to .partitions which would not have otherwise occurred (assuming the job succeeded).

Ordering of .partitions calls

I don't think the order in which .partitions calls occur matters for correctness: the DAGScheduler happens to invoke .partitions in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many lots of out-of-order .partition calls occurring elsewhere in the codebase.

Handling of exceptions in .partitions

I've chosen not to add special error-handling for the new .partitions calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether getPartitions occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be influenced by Spark settings in non-obvious ways), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (#24438 (review)), I've decided to skip adding a configuration flag for this.

Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

I added a regression test in BasicSchedulerIntegrationSuite to cover the regular job submission codepath (DAGScheduler.submitJob)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow getPartitions() call and checks that a concurrently-submitted job is not blocked.

I have not added separate integration tests for the runApproximateJob and submitMapStage codepaths (both of which also received the same fix).

@github-actions github-actions bot added the CORE label Oct 12, 2021
@JoshRosen
Copy link
Contributor Author

This is a longstanding issue and there's been multiple previous attempts to fix it.

Some early attempts were rejected due to thread-safety issues with their approaches or became stale without review.

This PR's approach is very similar to @ajithme's approach in #27234, with a few key differences:

  • I allowed exceptions to bubble instead of logging and ignoring them.
  • I used a faster and less-race-condition-prone testing approach (using the SchedulerIntegrationSuite framework).
  • I used a non-recursive tree-traversal method (based on similar existing methods) to avoid stack overflow errors when traversing huge DAGs.
  • I also added the fix to submitMapStage and runApproximateJob: these are much lesser used codepaths but can still potentially benefit from the fix.

@JoshRosen JoshRosen requested a review from srowen October 12, 2021 23:57
@JoshRosen
Copy link
Contributor Author

JoshRosen commented Oct 12, 2021

/cc @squito @dongjoon-hyun @vanzin @srowen @yuchenhuo @jiangxb1987 for review (since they were also tagged in previous PRs for this issue).

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know much about this, but your thorough argument is pretty convincing. If it just front-loads work and doesn't do more work, and avoids runtime wobbles, seems good.

@SparkQA
Copy link

SparkQA commented Oct 13, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48645/

@SparkQA
Copy link

SparkQA commented Oct 13, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48645/

@jiangxb1987
Copy link
Contributor

Does this change have any impact on job cancellation? For example, after this change, if I cancel a job shortly after the job is submitted, will I fail to actually cancel this job because the DAGScheduler is still working on resolve the rdd partitions (so it haven't updated the jobIdToStageIds map).

@SparkQA
Copy link

SparkQA commented Oct 13, 2021

Test build #144167 has finished for PR 34265 at commit 3bcc554.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change looks good, thanks for working on this @JoshRosen !
Do you want to add timing info in debug mode ?

@mridulm
Copy link
Contributor

mridulm commented Oct 13, 2021

Does this change have any impact on job cancellation? For example, after this change, if I cancel a job shortly after the job is submitted, will I fail to actually cancel this job because the DAGScheduler is still working on resolve the rdd partitions (so it haven't updated the jobIdToStageIds map).

This is happening within submission itself - so caller does not have jobId to cancel on (until this completes).

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@JoshRosen
Copy link
Contributor Author

@jiangxb1987, I don't think this will adversely impact job cancellation via cancelJobGroup:

The three DAGScheduler methods that I modified are invoked from the main Spark application thread, not the scheduler event thread. At this point no jobs have been submitted.

Just for sake of argument / completeness, let's say that we had a slow getPartitions which was executing inside of the scheduler: before, the slow getPartitions wouldn't have been cancellable or interruptible at all, since we'd have to wait for it to complete before the job group cancellation event could be processed by the DAG scheduler's event loop.

After this patch's change, that slow work is performed outside of the DAGScheduler. Even though there's no job to cancel, what happens if we're running in a notebook or REPL environment and want to cancel the running cell / command? This depends on the behavior of the notebook/REPL: if the notebook/REPL sends a Thread.interrupt() to the running driver thread then then getPartitions call might be able exit early depending on whether it's running code which checks for thread interrupts (such as IO, e.g. due to Hadoop filesystem listing operations).

Given this, I think we're okay: we haven't impacted job group cancellation and the driver thread interruption situation is the same as it would be for other slow driver-side operations (such as query planning in Spark SQL).


As an aside, I do think it's a bit confusing how the DAGScheduler class has both public methods (called outside the event loop) and private methods used inside the loop mixed into the same class but with different rules around accessing private state. If we wanted to re-architect things then I think it would be clearer to separate those responsibilities into separate classes (maybe something like DAGSchedulerClient and DAGSchedulerBackend) to more strongly enforce this separation and to prevent accidental access of event-loop state from outside of the loop.

@JoshRosen
Copy link
Contributor Author

Do you want to add timing info in debug mode ?

@mridulm, good idea: I added logging in 64050f2

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @JoshRosen .

@SparkQA
Copy link

SparkQA commented Oct 13, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48690/

@SparkQA
Copy link

SparkQA commented Oct 13, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48690/

@SparkQA
Copy link

SparkQA commented Oct 13, 2021

Test build #144211 has finished for PR 34265 at commit 64050f2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like before the three places calling eagerlyComputePartitionsForRddAndAncestors, there are some checks for rdd.partitions, so actually we just (need) compute its ancestors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but I chose not to make that optimization because (a) it doesn't actually matter from a performance perspective (accessing already-computed .partitions is very cheap) and (b) I think the optimization would make the code more complex.

Copy link
Contributor

@yuchenhuo yuchenhuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this! one suggestion for testing, not sure if that's a good idea though.

// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a good idea to add an assertion in the DebugFilesystem we have to check that it's not accessed within the event loop thread? It might help catch other cases where event loop might be doing heavy blocking operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea, but I'd like to defer it to a separate followup PR. I've filed https://issues.apache.org/jira/browse/SPARK-37009 to track that.

@JoshRosen
Copy link
Contributor Author

Thank you to everyone who helped to review this PR.

I'm merging this into master, branch-3.2, branch-3.1, and branch-3.0.

@JoshRosen JoshRosen closed this in c4e975e Oct 14, 2021
JoshRosen added a commit that referenced this pull request Oct 14, 2021
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes #34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
JoshRosen added a commit that referenced this pull request Oct 14, 2021
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes #34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
JoshRosen added a commit that referenced this pull request Oct 14, 2021
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes #34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
sunchao pushed a commit to sunchao/spark that referenced this pull request Dec 8, 2021
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (apache#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes apache#34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit 01ee46e)
Signed-off-by: Dongjoon Hyun <[email protected]>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (apache#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes apache#34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
catalinii pushed a commit to lyft/spark that referenced this pull request Feb 22, 2022
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (apache#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes apache#34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
catalinii pushed a commit to lyft/spark that referenced this pull request Mar 4, 2022
… submitting job to DAGScheduler

### What changes were proposed in this pull request?

This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.

With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.

#### Background

The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.

The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop.

However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission.

#### Correctness: proving that we make no excess `.partitions` calls

This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.

I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:

- Assume that this is the first time we are computing every RDD in the DAG.
- Every RDD appears in some stage.
- [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
- [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
- [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
- Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
- Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).

#### Ordering of `.partitions` calls

I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.

#### Handling of exceptions in `.partitions`

I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.

It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it.

#### Should this have a configuration flag?

Per discussion from a previous PR trying to solve this problem (apache#24438 (review)), I've decided to skip adding a configuration flag for this.

### Why are the changes needed?

This fixes a longstanding scheduler performance problem which has been reported by multiple users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.

I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).

Closes apache#34265 from JoshRosen/SPARK-23626.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
(cherry picked from commit c4e975e)
Signed-off-by: Josh Rosen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants