Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Feb 16, 2015

Currently, PySpark does not support narrow dependency during cogroup/join when the two RDDs have the partitioner, another unnecessary shuffle stage will come in.

The Python implementation of cogroup/join is different than Scala one, it depends on union() and partitionBy(). This patch will try to use PartitionerAwareUnionRDD() in union(), when all the RDDs have the same partitioner. It also fix reservePartitioner in all the map() or mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27573 has started for PR 4629 at commit eb26c62.

  • This patch merges cleanly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this method to call the union method that you modified so the change will take effect here, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27582 has started for PR 4629 at commit ff5a0a6.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27583 has started for PR 4629 at commit 940245e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27587 has started for PR 4629 at commit cc28d97.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27573 has finished for PR 4629 at commit eb26c62.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27573/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27582 has finished for PR 4629 at commit ff5a0a6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27582/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #610 has started for PR 4629 at commit cc28d97.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27583 has finished for PR 4629 at commit 940245e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27583/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27587 has finished for PR 4629 at commit cc28d97.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):
    • case class ParquetRelation2(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27587/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #610 has finished for PR 4629 at commit cc28d97.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _.partitioner is an option, then I think this can be simplified by using flatMap instead of map, since that would just let you check whether partitioners.size == 1 on the next line without having to have the isDefined check as well.

@JoshRosen
Copy link
Contributor

LGTM overall; this is tricky logic, though, so I'll take one more pass through when I get home.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27612 has started for PR 4629 at commit 4d29932.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27612 has finished for PR 4629 at commit 4d29932.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27612/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #611 has started for PR 4629 at commit 4d29932.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #611 has finished for PR 4629 at commit 4d29932.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these tests actually check for a narrow dependency at all? I think they will pass even without it.

I'm not sure of a better suggestion, though. I had to use getNarrowDependencies in another PR to check this:
https://github.com/apache/spark/pull/4449/files#diff-4bc3643ce90b54113cad7104f91a075bR582

but I don't think that is even exposed in pyspark ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is only for correctness, I will add more check for narrow dependency base one the Python progress API (#3027)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've merged #3027, so I think we can now test this by setting a job group, running a job, then querying the statusTracker to determine how many stages were actually run as part of that job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27657 has started for PR 4629 at commit dffe34e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27657 has finished for PR 4629 at commit dffe34e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):
    • class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")):
    • class SparkStageInfo(namedtuple("SparkStageInfo",
    • class StatusTracker(object):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27657/
Test PASSed.

@JoshRosen
Copy link
Contributor

Thanks for adding the test.

LGTM, so I'm going to merge this into master (1.4.0) and branch-1.3 (1.3.0). Thanks!

asfgit pushed a commit that referenced this pull request Feb 18, 2015
Currently, PySpark does not support narrow dependency during cogroup/join when the two RDDs have the partitioner, another unnecessary shuffle stage will come in.

The Python implementation of cogroup/join is different than Scala one, it depends on union() and partitionBy(). This patch will try to use PartitionerAwareUnionRDD() in union(), when all the RDDs have the same partitioner. It also fix `reservePartitioner` in all the map() or mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage.

Author: Davies Liu <[email protected]>

Closes #4629 from davies/narrow and squashes the following commits:

dffe34e [Davies Liu] improve test, check number of stages for join/cogroup
1ed3ba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into narrow
4d29932 [Davies Liu] address comment
cc28d97 [Davies Liu] add unit tests
940245e [Davies Liu] address comments
ff5a0a6 [Davies Liu] skip the partitionBy() on Python side
eb26c62 [Davies Liu] narrow dependency in PySpark

(cherry picked from commit c3d2b90)
Signed-off-by: Josh Rosen <[email protected]>
@asfgit asfgit closed this in c3d2b90 Feb 18, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants