Skip to content

Conversation

@ueshin
Copy link
Member

@ueshin ueshin commented Apr 25, 2016

What changes were proposed in this pull request?

DAGSchedulersometimes generate incorrect stage graph.

Suppose you have the following DAG:

[A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
            \                /
              <-------------

Note: [] means an RDD, () means a shuffle dependency.

Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both B and A. The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example easier to understand, let's call the shuffled data from A shuffle dependency ID s_A and the shuffled data from B shuffle dependency ID s_B.
The getAncestorShuffleDependencies method in DAGScheduler (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when getAncestorShuffleDependencies gets called on C (previous of the final RDD), getAncestorShuffleDependencies will return s_A, s_B, s_A (s_A gets added twice: once when the method "visit"s RDD C, and once when the method "visit"s RDD B). This is problematic because this line of code: https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by getAncestorShuffleDependencies, resulting in duplicate map stages that compute the map output from RDD A.

As a result, DAGScheduler generates the following stages and their parents for each shuffle:

stage parents
s_A ShuffleMapStage 2 List()
s_B ShuffleMapStage 1 List(ShuffleMapStage 0)
s_C ShuffleMapStage 3 List(ShuffleMapStage 1, ShuffleMapStage 2)
- ResultStage 4 List(ShuffleMapStage 3)

The stage for s_A should be ShuffleMapStage 0, but the stage for s_A is generated twice as ShuffleMapStage 2 and ShuffleMapStage 0 is overwritten by ShuffleMapStage 2, and the stage ShuffleMap Stage1 keeps referring the old stage ShuffleMapStage 0.

This patch is fixing it.

How was this patch tested?

I added the sample RDD graph to show the illegal stage graph to DAGSchedulerSuite.

val parents = new Stack[ShuffleDependency[_, _, _]]
/**
* Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet.
* This is done in topological order to create ancestor stages first to ensure that the result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this comment to some place inside the method? It doesn't seem relevant to someone using this method, since the order of the ShuffleDependencies returned doesn't matter

@markhamstra
Copy link
Contributor

I think some of the terminology used in this and related PRs is confusing the issues. When @kayousterhout and I ask about "correctness", what we are fundamentally concerned about is whether evaluation of the DAG produces the correct data elements. I don't think that your description of "incorrect" or "illegal" graphs is meant to imply that incorrect data is produced from their evaluation. Correct me if I am wrong, but I think that you are talking exclusively about graphs that are not optimal, causing duplication of effort and preventing further optimizations -- graphs that are taking longer to evaluate than is necessary, not graphs that are producing incorrect data elements.

If I am thinking correctly about this, then the entire effect of this and related PRs is to improve or optimize the DAGScheduler, not to create graphs and schedules that produce different end results than the DAGScheduler does now.

@SparkQA
Copy link

SparkQA commented Apr 25, 2016

Test build #56868 has finished for PR 12655 at commit 3a8ff84.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 25, 2016

Test build #56873 has finished for PR 12655 at commit cab5264.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member Author

ueshin commented Apr 25, 2016

@markhamstra Thank you for your comment.
I thought the non-optimal state in DAGScheduler was a kind of bug so I used the words "incorrect" or "illegal" but now I see what you thought.
So yes, I was talking about only graphs and want to improve DAGSchduler performance by this and #12060.

@kayousterhout
Copy link
Contributor

kayousterhout commented Apr 25, 2016

I just updated the JIRA with what I understand to be the issue. Can you take a look and let me know if that's correct? If the simpler example I showed is sufficient to reproduce the issue (and my explanation is correct), can you simplify the unit test to use that example, and also update the JIRA and pull request description to have that text?

Also, if that is a correct explanation of the issue, I think there is a simpler fix than the one you did. What about changing the method getAncestorShuffleDependencies to instead be called createAncestorShuffleMapStages (and have it not return anything). Then in that method, instead of adding each shuffle dependency to parents, immediately create the shuffle stage there (using the line of code that's currently here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289). That way, the check for if shuffleIdToMapStage already contains the dependency will work correctly so we don't create duplicate changes (and that is a simpler change).

@ueshin
Copy link
Member Author

ueshin commented Apr 26, 2016

@kayousterhout
I'll update the unit test and JIRA, PR descriptions to use the simpler example.

As for the createAncestorShuffleMapStages way, I think it will have a risk of StackOverflowError for a long job because the master branch version of getAncestorShuffleDependencies finds descendants first, which would have a lot of ancestors with long linage, and if we immediately create the shuffle stage there by the newOrUsedShuffleStage method, which builds all ancestor stages by recursive-call fashion, the StackOverflowError will be thrown.

@kayousterhout
Copy link
Contributor

I see -- now I understand the motivation for returning the shuffle dependencies topologically sorted, because it limits the depth of the recursion (it looks like the old code was trying to do that with the stack, but didn't quite get it right?). Let me think about whether there's a simpler way to accomplish that.

@ueshin
Copy link
Member Author

ueshin commented Apr 26, 2016

@kayousterhout
I added a comment to JIRA.
Could you take a look at it and let me know which example I should use for the description, the original one or the "simpler" one (because the "simpler" one is not so simpler than original..).

@markhamstra
Copy link
Contributor

It looks like #8923 by @suyanNone fixes at least the test that this PR adds to the DAGSchedulerSuite, and does so much more simply.

@kayousterhout
Copy link
Contributor

Thanks for pointing that out @markhamstra. I'd be in favor of that solution (but augmented with a more clear test case -- which might just mean adding a nice ascii-art description of the DAG in the test case). What do you think Mark? @useshin do you see any issue with the simpler approach in #8923?

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57709 has finished for PR 12655 at commit ab92488.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 9, 2016

Test build #58119 has finished for PR 12655 at commit b4e2eb1.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin ueshin changed the title [SPARK-13902][SCHEDULER] Make DAGScheduler.getAncestorShuffleDependencies() return in topological order to ensure building ancestor stages first. [SPARK-13902][SCHEDULER] Make DAGScheduler not to create duplicate stage. May 9, 2016
@SparkQA
Copy link

SparkQA commented May 9, 2016

Test build #58121 has finished for PR 12655 at commit 55d6b6d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member Author

ueshin commented May 9, 2016

@kayousterhout I simplified a test and updated JIRA and PR title and description.
Please take a look at them again and let me know what else to do to merge this PR.

*
* Note: [] means an RDD, () means a shuffle dependency.
*/
test("[SPARK-13902] not to create duplicate stage.") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this to "[SPARK-13902] Ensure no duplicate stages are created"?

@kayousterhout
Copy link
Contributor

This change looks good to me at this point (with the one small test name change). @markhamstra are you satisfied with this solution for now, and more significant clean up of this code path can be done in a later PR?

@markhamstra
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented May 12, 2016

Test build #58429 has finished for PR 12655 at commit 3ceb4d5.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member Author

ueshin commented May 12, 2016

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented May 12, 2016

Test build #58438 has finished for PR 12655 at commit 3ceb4d5.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

Sorry there was a build break in master.

retest this please

@ueshin
Copy link
Member Author

ueshin commented May 12, 2016

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented May 12, 2016

Test build #58452 has finished for PR 12655 at commit 3ceb4d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented May 12, 2016

I took another look - we should probably merge this in 2.0 too. Kay can you do that? Thanks!

asfgit pushed a commit that referenced this pull request May 12, 2016
…age.

## What changes were proposed in this pull request?

`DAGScheduler`sometimes generate incorrect stage graph.

Suppose you have the following DAG:

```
[A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
            \                /
              <-------------
```

Note: [] means an RDD, () means a shuffle dependency.

Here, RDD `B` has a shuffle dependency on RDD `A`, and RDD `C` has shuffle dependency on both `B` and `A`. The shuffle dependency IDs are numbers in the `DAGScheduler`, but to make the example easier to understand, let's call the shuffled data from `A` shuffle dependency ID `s_A` and the shuffled data from `B` shuffle dependency ID `s_B`.
The `getAncestorShuffleDependencies` method in `DAGScheduler` (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when `getAncestorShuffleDependencies` gets called on `C` (previous of the final RDD), `getAncestorShuffleDependencies` will return `s_A`, `s_B`, `s_A` (`s_A` gets added twice: once when the method "visit"s RDD `C`, and once when the method "visit"s RDD `B`). This is problematic because this line of code: https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by `getAncestorShuffleDependencies`, resulting in duplicate map stages that compute the map output from RDD `A`.

As a result, `DAGScheduler` generates the following stages and their parents for each shuffle:

| | stage | parents |
|----|----|----|
| s_A | ShuffleMapStage 2 | List() |
| s_B | ShuffleMapStage 1 | List(ShuffleMapStage 0) |
| s_C | ShuffleMapStage 3 | List(ShuffleMapStage 1, ShuffleMapStage 2) |
| - | ResultStage 4 | List(ShuffleMapStage 3) |

The stage for s_A should be `ShuffleMapStage 0`, but the stage for `s_A` is generated twice as `ShuffleMapStage 2` and `ShuffleMapStage 0` is overwritten by `ShuffleMapStage 2`, and the stage `ShuffleMap Stage1` keeps referring the old stage `ShuffleMapStage 0`.

This patch is fixing it.

## How was this patch tested?

I added the sample RDD graph to show the illegal stage graph to `DAGSchedulerSuite`.

Author: Takuya UESHIN <[email protected]>

Closes #12655 from ueshin/issues/SPARK-13902.
@kayousterhout
Copy link
Contributor

Merged into 2.0. Thanks @ueshin for your work on this and for bearing with us as we agreed on the simplest solution -- awesome to have this fixed!

@ueshin
Copy link
Member Author

ueshin commented May 13, 2016

@kayousterhout, @markhamstra Thanks a lot!
I would like you to go back to #12060.

zzcclp added a commit to zzcclp/spark that referenced this pull request May 13, 2016
ueshin added a commit to ueshin/apache-spark that referenced this pull request May 27, 2016
…age.

## What changes were proposed in this pull request?

`DAGScheduler`sometimes generate incorrect stage graph.

Suppose you have the following DAG:

```
[A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
            \                /
              <-------------
```

Note: [] means an RDD, () means a shuffle dependency.

Here, RDD `B` has a shuffle dependency on RDD `A`, and RDD `C` has shuffle dependency on both `B` and `A`. The shuffle dependency IDs are numbers in the `DAGScheduler`, but to make the example easier to understand, let's call the shuffled data from `A` shuffle dependency ID `s_A` and the shuffled data from `B` shuffle dependency ID `s_B`.
The `getAncestorShuffleDependencies` method in `DAGScheduler` (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when `getAncestorShuffleDependencies` gets called on `C` (previous of the final RDD), `getAncestorShuffleDependencies` will return `s_A`, `s_B`, `s_A` (`s_A` gets added twice: once when the method "visit"s RDD `C`, and once when the method "visit"s RDD `B`). This is problematic because this line of code: https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by `getAncestorShuffleDependencies`, resulting in duplicate map stages that compute the map output from RDD `A`.

As a result, `DAGScheduler` generates the following stages and their parents for each shuffle:

| | stage | parents |
|----|----|----|
| s_A | ShuffleMapStage 2 | List() |
| s_B | ShuffleMapStage 1 | List(ShuffleMapStage 0) |
| s_C | ShuffleMapStage 3 | List(ShuffleMapStage 1, ShuffleMapStage 2) |
| - | ResultStage 4 | List(ShuffleMapStage 3) |

The stage for s_A should be `ShuffleMapStage 0`, but the stage for `s_A` is generated twice as `ShuffleMapStage 2` and `ShuffleMapStage 0` is overwritten by `ShuffleMapStage 2`, and the stage `ShuffleMap Stage1` keeps referring the old stage `ShuffleMapStage 0`.

This patch is fixing it.

## How was this patch tested?

I added the sample RDD graph to show the illegal stage graph to `DAGSchedulerSuite`.

Author: Takuya UESHIN <[email protected]>

Closes apache#12655 from ueshin/issues/SPARK-13902.
ueshin added a commit to ueshin/apache-spark that referenced this pull request Jun 28, 2016
…age.

## What changes were proposed in this pull request?

`DAGScheduler`sometimes generate incorrect stage graph.

Suppose you have the following DAG:

```
[A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
            \                /
              <-------------
```

Note: [] means an RDD, () means a shuffle dependency.

Here, RDD `B` has a shuffle dependency on RDD `A`, and RDD `C` has shuffle dependency on both `B` and `A`. The shuffle dependency IDs are numbers in the `DAGScheduler`, but to make the example easier to understand, let's call the shuffled data from `A` shuffle dependency ID `s_A` and the shuffled data from `B` shuffle dependency ID `s_B`.
The `getAncestorShuffleDependencies` method in `DAGScheduler` (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when `getAncestorShuffleDependencies` gets called on `C` (previous of the final RDD), `getAncestorShuffleDependencies` will return `s_A`, `s_B`, `s_A` (`s_A` gets added twice: once when the method "visit"s RDD `C`, and once when the method "visit"s RDD `B`). This is problematic because this line of code: https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by `getAncestorShuffleDependencies`, resulting in duplicate map stages that compute the map output from RDD `A`.

As a result, `DAGScheduler` generates the following stages and their parents for each shuffle:

| | stage | parents |
|----|----|----|
| s_A | ShuffleMapStage 2 | List() |
| s_B | ShuffleMapStage 1 | List(ShuffleMapStage 0) |
| s_C | ShuffleMapStage 3 | List(ShuffleMapStage 1, ShuffleMapStage 2) |
| - | ResultStage 4 | List(ShuffleMapStage 3) |

The stage for s_A should be `ShuffleMapStage 0`, but the stage for `s_A` is generated twice as `ShuffleMapStage 2` and `ShuffleMapStage 0` is overwritten by `ShuffleMapStage 2`, and the stage `ShuffleMap Stage1` keeps referring the old stage `ShuffleMapStage 0`.

This patch is fixing it.

## How was this patch tested?

I added the sample RDD graph to show the illegal stage graph to `DAGSchedulerSuite`.

Author: Takuya UESHIN <[email protected]>

Closes apache#12655 from ueshin/issues/SPARK-13902.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants