Skip to content

Conversation

@yhuai
Copy link
Contributor

@yhuai yhuai commented Jul 27, 2015

This PR introduces three improvements to SQL planner..

First, it adds an optimization rule FilterNullsInJoinKey to add Filter before join operators to filter out rows having null values for join keys.

Second, it adds NullUnsafeClusteredDistribution and NullUnsafeHashPartitioning, which can be used to distribute rows having null values for join keys evenly. NullUnsafeClusteredDistribution is basically the same with ClusteredDistribution (now renamed to NullSafeClusteredDistribution) except that it does not require rows having null values for join keys be clustered.

Third, it adds PartitioningCollection, which is used to represent the outputPartitioning for SparkPlans with multiple children (e.g. ShuffledHashJoin). So, a SparkPlan can have multiple descriptions of its partitioning schemes. Taking ShuffledHashJoin as an example, it has two descriptions of its partitioning schemes, i.e. left.outputPartitioning and right.outputPartitioning. So when we have a query like select * from t1 join t2 on (t1.x = t2.x) join t3 on (t2.x = t3.x) will only have three Exchange operators (when shuffled joins are needed) instead of four.

Optimizations in the first and second improvement are guarded by spark.sql.advancedOptimization.

I will add more comments/doc and test later.

@yhuai yhuai changed the title [SPARK-2205] [SPARK-7871] [SPARK-9372] [SQL] [WIP] Three SQL optimziations [SPARK-2205] [SPARK-7871] [SPARK-9372] [SQL] [WIP] Improving SQL query planner Jul 27, 2015
@SparkQA
Copy link

SparkQA commented Jul 27, 2015

Test build #38507 has finished for PR 7685 at commit e66d5a9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AtLeastNNulls(n: Int, children: Seq[Expression]) extends Predicate
    • case class AtLeastNNonNullNans(n: Int, children: Seq[Expression]) extends Predicate
    • class DefaultOptimizer extends Optimizer
    • case class NullSafeClusteredDistribution(clustering: Seq[Expression]) extends Distribution
    • case class NullUnsafeClusteredDistribution(clustering: Seq[Expression]) extends Distribution
    • case class NullSafeHashPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class NullUnsafeHashPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class PartitioningCollection(partitionings: Seq[Partitioning])
    • case class FilterNullsInJoinKey(

@SparkQA
Copy link

SparkQA commented Jul 27, 2015

Test build #38511 has finished for PR 7685 at commit 13d1c9e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 27, 2015

Test build #38588 has finished for PR 7685 at commit d3d2e64.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 27, 2015

Test build #38606 has finished for PR 7685 at commit c57a954.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AtLeastNNulls(n: Int, children: Seq[Expression]) extends Predicate
    • case class AtLeastNNonNullNans(n: Int, children: Seq[Expression]) extends Predicate
    • class DefaultOptimizer extends Optimizer
    • case class ClusteredDistribution(
    • case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int, nullSafe: Boolean)
    • case class PartitioningCollection(partitionings: Seq[Partitioning])
    • case class FilterNullsInJoinKey(

@JoshRosen
Copy link
Contributor

@yhuai, do you think that we should carve this up into multiple PRs to ease reviews? It looks like FilterNullsInJoinKey should be easy to split out.

@yhuai
Copy link
Contributor Author

yhuai commented Jul 28, 2015

@JoshRosen yeah. Let me split it.

@yhuai
Copy link
Contributor Author

yhuai commented Jul 28, 2015

btw, FilterNullsInJoinKeySuite is not deterministic. I will change it later.

asfgit pushed a commit that referenced this pull request Aug 3, 2015
…joins

This PR adds `PartitioningCollection`, which is used to represent the `outputPartitioning` for SparkPlans with multiple children (e.g. `ShuffledHashJoin`). So, a `SparkPlan` can have multiple descriptions of its partitioning schemes. Taking `ShuffledHashJoin` as an example, it has two descriptions of its partitioning schemes, i.e. `left.outputPartitioning` and `right.outputPartitioning`. So when we have a query like `select * from t1 join t2 on (t1.x = t2.x) join t3 on (t2.x = t3.x)` will only have three Exchange operators (when shuffled joins are needed) instead of four.

The code in this PR was authored by yhuai; I'm opening this PR to factor out this change from #7685, a larger pull request which contains two other optimizations.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7773)
<!-- Reviewable:end -->

Author: Yin Huai <[email protected]>
Author: Josh Rosen <[email protected]>

Closes #7773 from JoshRosen/multi-way-join-planning-improvements and squashes the following commits:

5c45924 [Josh Rosen] Merge remote-tracking branch 'origin/master' into multi-way-join-planning-improvements
cd8269b [Josh Rosen] Refactor test to use SQLTestUtils
2963857 [Yin Huai] Revert unnecessary SqlConf change.
73913f7 [Yin Huai] Add comments and test. Also, revert the change in ShuffledHashOuterJoin for now.
4a99204 [Josh Rosen] Delete unrelated expression change
884ab95 [Josh Rosen] Carve out only SPARK-2205 changes.
247e5fa [Josh Rosen] Merge remote-tracking branch 'origin/master' into multi-way-join-planning-improvements
c57a954 [Yin Huai] Bug fix.
d3d2e64 [Yin Huai] First round of cleanup.
f9516b0 [Yin Huai] Style
c6667e7 [Yin Huai] Add PartitioningCollection.
e616d3b [Yin Huai] wip
7c2d2d8 [Yin Huai] Bug fix and refactoring.
69bb072 [Yin Huai] Introduce NullSafeHashPartitioning and NullUnsafePartitioning.
d5b84c3 [Yin Huai] Do not add unnessary filters.
2201129 [Yin Huai] Filter out rows that will not be joined in equal joins early.
asfgit pushed a commit that referenced this pull request Aug 3, 2015
This PR adds an optimization rule, `FilterNullsInJoinKey`, to add `Filter` before join operators to filter out rows having null values for join keys.

This optimization is guarded by a new SQL conf, `spark.sql.advancedOptimization`.

The code in this PR was authored by yhuai; I'm opening this PR to factor out this change from #7685, a larger pull request which contains two other optimizations.

Author: Yin Huai <[email protected]>
Author: Josh Rosen <[email protected]>

Closes #7768 from JoshRosen/filter-nulls-in-join-key and squashes the following commits:

c02fc3f [Yin Huai] Address Josh's comments.
0a8e096 [Yin Huai] Update comments.
ea7d5a6 [Yin Huai] Make sure we do not keep adding filters.
be88760 [Yin Huai] Make it clear that FilterNullsInJoinKeySuite.scala is used to test FilterNullsInJoinKey.
8bb39ad [Yin Huai] Fix non-deterministic tests.
303236b [Josh Rosen] Revert changes that are unrelated to null join key filtering
40eeece [Josh Rosen] Merge remote-tracking branch 'origin/master' into filter-nulls-in-join-key
c57a954 [Yin Huai] Bug fix.
d3d2e64 [Yin Huai] First round of cleanup.
f9516b0 [Yin Huai] Style
c6667e7 [Yin Huai] Add PartitioningCollection.
e616d3b [Yin Huai] wip
7c2d2d8 [Yin Huai] Bug fix and refactoring.
69bb072 [Yin Huai] Introduce NullSafeHashPartitioning and NullUnsafePartitioning.
d5b84c3 [Yin Huai] Do not add unnessary filters.
2201129 [Yin Huai] Filter out rows that will not be joined in equal joins early.
@yhuai
Copy link
Contributor Author

yhuai commented Aug 3, 2015

I am closing it.

@yhuai yhuai closed this Aug 3, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants