Skip to content

Conversation

@tejasapatil
Copy link
Contributor

What changes were proposed in this pull request?

This is a backport of #15272 to 2.0 branch.

Jira : https://issues.apache.org/jira/browse/SPARK-17698

ExtractEquiJoinKeys is incorrectly using filter predicates as the join condition for joins. canEvaluate [0] tries to see if the an Expression can be evaluated using output of a given Plan. In case of filter predicates (eg. a.id='1'), the Expression passed for the right hand side (ie. '1' ) is a Literal which does not have any attribute references. Thus expr.references is an empty set which theoretically is a subset of any set. This leads to canEvaluate returning true and a.id='1' is treated as a join predicate. While this does not lead to incorrect results but in case of bucketed + sorted tables, we might miss out on avoiding un-necessary shuffle + sort. See example below:

[0] : https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L91

eg.

val df = (1 until 10).toDF("id").coalesce(1)
hc.sql("DROP TABLE IF EXISTS table1").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table1")
hc.sql("DROP TABLE IF EXISTS table2").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table2")

sqlContext.sql("""
  SELECT a.id, b.id
  FROM table1 a
  FULL OUTER JOIN table2 b
  ON a.id = b.id AND a.id='1' AND b.id='1'
""").explain(true)

BEFORE: This is doing shuffle + sort over table scan outputs which is not needed as both tables are bucketed and sorted on the same columns and have same number of buckets. This should be a single stage job.

SortMergeJoin [id#38, cast(id#38 as double), 1.0], [id#39, 1.0, cast(id#39 as double)], FullOuter
:- *Sort [id#38 ASC NULLS FIRST, cast(id#38 as double) ASC NULLS FIRST, 1.0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#38, cast(id#38 as double), 1.0, 200)
:     +- *FileScan parquet default.table1[id#38] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
+- *Sort [id#39 ASC NULLS FIRST, 1.0 ASC NULLS FIRST, cast(id#39 as double) ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#39, 1.0, cast(id#39 as double), 200)
      +- *FileScan parquet default.table2[id#39] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>

AFTER :

SortMergeJoin [id#32], [id#33], FullOuter, ((cast(id#32 as double) = 1.0) && (cast(id#33 as double) = 1.0))
:- *FileScan parquet default.table1[id#32] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
+- *FileScan parquet default.table2[id#33] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>

How was this patch tested?

  • Added a new test case for this scenario : SPARK-17698 Join predicates should not contain filter clauses
  • Ran all the tests in BucketedReadSuite

@SparkQA
Copy link

SparkQA commented Oct 22, 2016

Test build #67396 has finished for PR 15600 at commit df50838.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Oct 22, 2016

Thanks - merging in. Can you close this?

asfgit pushed a commit that referenced this pull request Oct 22, 2016
## What changes were proposed in this pull request?

This is a backport of #15272 to 2.0 branch.

Jira : https://issues.apache.org/jira/browse/SPARK-17698

`ExtractEquiJoinKeys` is incorrectly using filter predicates as the join condition for joins. `canEvaluate` [0] tries to see if the an `Expression` can be evaluated using output of a given `Plan`. In case of filter predicates (eg. `a.id='1'`), the `Expression` passed for the right hand side (ie. '1' ) is a `Literal` which does not have any attribute references. Thus `expr.references` is an empty set which theoretically is a subset of any set. This leads to `canEvaluate` returning `true` and `a.id='1'` is treated as a join predicate. While this does not lead to incorrect results but in case of bucketed + sorted tables, we might miss out on avoiding un-necessary shuffle + sort. See example below:

[0] : https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L91

eg.

```
val df = (1 until 10).toDF("id").coalesce(1)
hc.sql("DROP TABLE IF EXISTS table1").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table1")
hc.sql("DROP TABLE IF EXISTS table2").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table2")

sqlContext.sql("""
  SELECT a.id, b.id
  FROM table1 a
  FULL OUTER JOIN table2 b
  ON a.id = b.id AND a.id='1' AND b.id='1'
""").explain(true)
```

BEFORE: This is doing shuffle + sort over table scan outputs which is not needed as both tables are bucketed and sorted on the same columns and have same number of buckets. This should be a single stage job.

```
SortMergeJoin [id#38, cast(id#38 as double), 1.0], [id#39, 1.0, cast(id#39 as double)], FullOuter
:- *Sort [id#38 ASC NULLS FIRST, cast(id#38 as double) ASC NULLS FIRST, 1.0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#38, cast(id#38 as double), 1.0, 200)
:     +- *FileScan parquet default.table1[id#38] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
+- *Sort [id#39 ASC NULLS FIRST, 1.0 ASC NULLS FIRST, cast(id#39 as double) ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#39, 1.0, cast(id#39 as double), 200)
      +- *FileScan parquet default.table2[id#39] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
```

AFTER :

```
SortMergeJoin [id#32], [id#33], FullOuter, ((cast(id#32 as double) = 1.0) && (cast(id#33 as double) = 1.0))
:- *FileScan parquet default.table1[id#32] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
+- *FileScan parquet default.table2[id#33] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
```

## How was this patch tested?

- Added a new test case for this scenario : `SPARK-17698 Join predicates should not contain filter clauses`
- Ran all the tests in `BucketedReadSuite`

Author: Tejas Patil <[email protected]>

Closes #15600 from tejasapatil/SPARK-17698_2.0_backport.
@tejasapatil tejasapatil deleted the SPARK-17698_2.0_backport branch October 22, 2016 23:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants