Skip to content

Conversation

@aray
Copy link
Contributor

@aray aray commented Jul 21, 2017

What changes were proposed in this pull request?

In some complex queries where the same table is joined multiple times interleaved with aggregation we can get conflicting attributes that leak via partitionings leading to wrong results because shuffles are not inserted. See JoinSuite diff for example. This patch adds a method to Partitioning that restricts it to a given set of output attributes. This method is then called by operators that generally maintain their input distribution but output only a subset of the inputs.

How was this patch tested?

Unit test based on example code from JIRA and additional unit testing of new method.

@aray
Copy link
Contributor Author

aray commented Jul 21, 2017

Plan for the example query before the patch (with partitioning as suffix):

*HashAggregate(keys=[parent#228], functions=[], output=[level2#274]) hashpartitioning(parent#228, 5)
+- *HashAggregate(keys=[parent#228], functions=[], output=[parent#228]) hashpartitioning(parent#228, 5)
   +- *Project [parent#228] hashpartitioning(parent#228, 5)
      +- *BroadcastHashJoin [level1#270], [son#227], Inner, BuildRight hashpartitioning(parent#228, 5)
         :- *HashAggregate(keys=[parent#228], functions=[], output=[level1#270]) hashpartitioning(parent#228, 5)
         :  +- Exchange hashpartitioning(parent#228, 5) hashpartitioning(parent#228, 5)
         :     +- *HashAggregate(keys=[parent#228], functions=[], output=[parent#228]) UnknownPartitioning(0)
         :        +- *Project [parent#228] UnknownPartitioning(0)
         :           +- *BroadcastHashJoin [id#266], [son#227], Inner, BuildRight UnknownPartitioning(0)
         :              :- *Project [_1#264 AS id#266] UnknownPartitioning(0)
         :              :  +- *Filter isnotnull(_1#264) UnknownPartitioning(0)
         :              :     +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple1, true])._1, true) AS _1#264] UnknownPartitioning(0)
         :              :        +- Scan ExternalRDDScan[obj#263] UnknownPartitioning(0)
         :              +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) BroadcastPartitioning(HashedRelationBroadcastMode(List(input[0, string, false])))
         :                 +- *Filter (isnotnull(son#227) && isnotnull(parent#228)) UnknownPartitioning(0)
         :                    +- InMemoryTableScan [son#227, parent#228], [isnotnull(son#227), isnotnull(parent#228)] UnknownPartitioning(0)
         :                          +- InMemoryRelation [son#227, parent#228], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), Statistics(sizeInBytes=148.0 B, hints=none)
         :                                +- *Project [_1#224 AS son#227, _2#225 AS parent#228] UnknownPartitioning(0)
         :                                   +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#224, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true) AS _2#225] UnknownPartitioning(0)
         :                                      +- Scan ExternalRDDScan[obj#223] UnknownPartitioning(0)
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) BroadcastPartitioning(HashedRelationBroadcastMode(List(input[0, string, false])))
            +- *Filter isnotnull(son#227) UnknownPartitioning(0)
               +- InMemoryTableScan [son#227, parent#228], [isnotnull(son#227)] UnknownPartitioning(0)
                     +- InMemoryRelation [son#227, parent#228], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), Statistics(sizeInBytes=148.0 B, hints=none)
                           +- *Project [_1#224 AS son#227, _2#225 AS parent#228] UnknownPartitioning(0)
                              +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#224, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true) AS _2#225] UnknownPartitioning(0)
                                 +- Scan ExternalRDDScan[obj#223] UnknownPartitioning(0)

and after the patch:

*HashAggregate(keys=[parent#228], functions=[], output=[level2#274]) UnknownPartitioning(5)
+- Exchange hashpartitioning(parent#228, 5) hashpartitioning(parent#228, 5)
   +- *HashAggregate(keys=[parent#228], functions=[], output=[parent#228]) UnknownPartitioning(5)
      +- *Project [parent#228] UnknownPartitioning(5)
         +- *BroadcastHashJoin [level1#270], [son#227], Inner, BuildRight UnknownPartitioning(5)
            :- *HashAggregate(keys=[parent#228], functions=[], output=[level1#270]) UnknownPartitioning(5)
            :  +- Exchange hashpartitioning(parent#228, 5) hashpartitioning(parent#228, 5)
            :     +- *HashAggregate(keys=[parent#228], functions=[], output=[parent#228]) UnknownPartitioning(0)
            :        +- *Project [parent#228] UnknownPartitioning(0)
            :           +- *BroadcastHashJoin [id#266], [son#227], Inner, BuildRight UnknownPartitioning(0)
            :              :- *Project [_1#264 AS id#266] UnknownPartitioning(0)
            :              :  +- *Filter isnotnull(_1#264) UnknownPartitioning(0)
            :              :     +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple1, true])._1, true) AS _1#264] UnknownPartitioning(0)
            :              :        +- Scan ExternalRDDScan[obj#263] UnknownPartitioning(0)
            :              +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) BroadcastPartitioning(HashedRelationBroadcastMode(List(input[0, string, false])))
            :                 +- *Filter (isnotnull(son#227) && isnotnull(parent#228)) UnknownPartitioning(0)
            :                    +- InMemoryTableScan [son#227, parent#228], [isnotnull(son#227), isnotnull(parent#228)] UnknownPartitioning(0)
            :                          +- InMemoryRelation [son#227, parent#228], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), Statistics(sizeInBytes=148.0 B, hints=none)
            :                                +- *Project [_1#224 AS son#227, _2#225 AS parent#228] UnknownPartitioning(0)
            :                                   +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#224, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true) AS _2#225] UnknownPartitioning(0)
            :                                      +- Scan ExternalRDDScan[obj#223] UnknownPartitioning(0)
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) BroadcastPartitioning(HashedRelationBroadcastMode(List(input[0, string, false])))
               +- *Filter isnotnull(son#227) UnknownPartitioning(0)
                  +- InMemoryTableScan [son#227, parent#228], [isnotnull(son#227)] UnknownPartitioning(0)
                        +- InMemoryRelation [son#227, parent#228], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), Statistics(sizeInBytes=148.0 B, hints=none)
                              +- *Project [_1#224 AS son#227, _2#225 AS parent#228] UnknownPartitioning(0)
                                 +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#224, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true) AS _2#225] UnknownPartitioning(0)
                                    +- Scan ExternalRDDScan[obj#223] UnknownPartitioning(0)

Note there is now an Exchange inserted between the two top level HashAggregate

@SparkQA
Copy link

SparkQA commented Jul 21, 2017

Test build #79822 has finished for PR 18697 at commit b05e630.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 21, 2017

Test build #79823 has finished for PR 18697 at commit cd5aa80.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@aray
Copy link
Contributor Author

aray commented Jul 21, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Jul 21, 2017

Test build #79836 has finished for PR 18697 at commit cd5aa80.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@aray aray changed the title [SPARK-16683][SQL] Repeated joins to same table can leak attributes via partitioning [SPARK-16683][SQL] Repeated joins to same table can leak attributes via partitioning giving incorrect results Jul 25, 2017
@aray
Copy link
Contributor Author

aray commented Jul 25, 2017

ping @rxin can someone look at this correctness fix?

@rxin
Copy link
Contributor

rxin commented Jul 25, 2017

cc @cloud-fan @hvanhovell

@gatorsmile
Copy link
Member

I will review this next week.

@viirya
Copy link
Member

viirya commented Jul 31, 2017

I'd like to reword the problem description as the current one looks obscure to me.

Currently we don't care if the output partitioning of an operator contains the attributes not in the output. For example, the output partitioning of Project [b, c] can be hash partition of [a]. It is possible because it may come from:

Project [b, c]
  Project [a, b, c] // output partitioning is hash partition of [a]
    Exchange hashpartitioning(a, 5)
      ...

I've noticed this and raised questions about it before. The answer I got is this doesn't do any harm so we don't fix it before.

However, this PR finds a case it possibly causes problem. Like:

BroadcastHashJoin [a, b, c] // now the output partition is wrongly hash partition of [a]...
  Project [c] // a isn't in output, but we don't change output partitioning
    Project [a, b, c] // output partitioning is hash partition of [a]
      Exchange hashpartitioning(a, 5)
  BroadcastExchange [...]
    Project [a, b] // output partitioning is hash partition of [b]
      ...


override def verboseStringWithSuffix: String = {
s"$verboseString $outputPartitioning"
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for debugging this, do we really need to print out output partitioning always?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't change anything that is in common use, one has to do plan.treeString(verbose = true, addSuffix = true) to get it. I would argue for keeping it for any future debugging.

base.createOrReplaceTempView("base")

val dist1 = spark.sql("""
SELECT parent level1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the code indent.

// dist1.count() // or put a count here

val dist2 = spark.sql("""
SELECT parent level2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@viirya
Copy link
Member

viirya commented Jul 31, 2017

A different view to this problem is, in the following part of query plan:

     :- *HashAggregate(keys=[parent#228], functions=[], output=[level1#270]) hashpartitioning(parent#228, 5)
     :  +- Exchange hashpartitioning(parent#228, 5) hashpartitioning(parent#228, 5)
     :     +- *HashAggregate(keys=[parent#228], functions=[], output=[parent#228]) UnknownPartitioning(0)
     :        +- *Project [parent#228] UnknownPartitioning(0)

At the top HashAggregate, when the output is going to alias the attribute parent to level1, the output partitioning hashpartitioning(parent, 5) doesn't change accordingly to hashpartitioning(level1, 5).

If we change HashAggregate's output partitioning accordingly with its output, this query can return correct results.

I think this is also an alternative solution. @aray What do you think?

Instead of replacing original output partitioning with UnknownPartitioning, we should better rewrite it with aliased attributes, so we can keep correct output partitioning and reduce redundant exchange.

@aray
Copy link
Contributor Author

aray commented Jul 31, 2017

@viirya We could certainly make that improvement. I believe it would be a fairly trivial change to this PR if we were just considering expressions that have the same canonical representation. However for reasons that are not clear to me an alias does not automatically have the same canonical representation as the exprId is not copied (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L984). Can anyone enlighten me as to why this is the case?

@SparkQA
Copy link

SparkQA commented Jul 31, 2017

Test build #80080 has finished for PR 18697 at commit f41811f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…6683

# Conflicts:
#	sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@SparkQA
Copy link

SparkQA commented Aug 31, 2017

Test build #81284 has finished for PR 18697 at commit 70a7268.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* attributes. If the partitioning is an [[Expression]] then the attributes that it depends on
* must be in the outputSet otherwise the attribute leaks.
*/
def restrict(outputSet: AttributeSet): Partitioning = this match {
Copy link
Member

@gatorsmile gatorsmile Aug 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are refactoring the concept of distribution and partitioning in the PR #19080

Could you provide your inputs in that PR first? Thanks!

@SparkQA
Copy link

SparkQA commented Aug 31, 2017

Test build #81286 has finished for PR 18697 at commit 0f21237.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

shouldn't we fix ProjectExec.outputPartitioning?

@viirya
Copy link
Member

viirya commented Sep 1, 2017

If we have correct outputPartitioning on each operators, I think this issue can be fixed.

@maropu
Copy link
Member

maropu commented Jul 23, 2018

@aray Can you close this for now because it's not active for a long time? (I'm not sure the current master still has this issue..., so you should check it first)

@HyukjinKwon
Copy link
Member

Let't close this then.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants