Skip to content

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Jun 19, 2018

What changes were proposed in this pull request?

In #19080 we simplified the distribution/partitioning framework, and make all the join-like operators require HashClusteredDistribution from children. Unfortunately streaming join operator was missed.

This can cause wrong result. Think about

val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)

The physical plan is

*(3) Project [a#5]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6]
   :     +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(b#11, 5)
      +- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11]
         +- StreamingRelation MemoryStream[value#3], [value#3]

The left table is hash partitioned by a, b, while the right table is hash partitioned by b. This means, we may have a matching record that is in different partitions, which should be in the output but not.

How was this patch tested?

N/A

@cloud-fan
Copy link
Contributor Author

cc @tdas @zsxwing

@SparkQA
Copy link

SparkQA commented Jun 19, 2018

Test build #92055 has finished for PR 21587 at commit 1f3d9df.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class HashClusteredDistribution(

@SparkQA
Copy link

SparkQA commented Jun 19, 2018

Test build #92056 has finished for PR 21587 at commit b69a727.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class HashClusteredDistribution(

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Jun 19, 2018

Test build #92072 has finished for PR 21587 at commit d102da3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class ClusteredDistributionBase(exprs: Seq[Expression]) extends Distribution
  • case class ClusteredDistribution(
  • case class HashClusteredDistribution(

@bogdanrdc
Copy link
Contributor

maybe also fix SinglePartition.satisfies. It is only checking for ClusteredDistribution and defaults to true otherwise. Luckily, SinglePartition.numPartitions is 1 so EnsureRequirements will still add a Shuffle to make the numPartitions match

@SparkQA
Copy link

SparkQA commented Jun 19, 2018

Test build #92094 has finished for PR 21587 at commit 6fc7913.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val numPartitions = 1

override def satisfies(required: Distribution): Boolean = required match {
override def satisfies0(required: Distribution): Boolean = required match {
Copy link
Contributor

@tdas tdas Jun 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add docs to explain what is satisfies0 and how it different from satisfies?
Otherwise its quite confusing.
When does one override satisfies, and when does one override satisfies0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in the base class

* This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the
* number of partitions, this distribution strictly requires which partition the tuple should be in.
*/
case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see any new tests in the DistributionSuite. I feel like issues likes this should have specified unit tests in DistributionSuite and shouldnt have to rely on StreamingJoinSuite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reorganized this test suite and added a bunch of new test cases, to improve the test coverage.

@SparkQA
Copy link

SparkQA commented Jun 20, 2018

Test build #92117 has finished for PR 21587 at commit 0795e40.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 20, 2018

Test build #92119 has finished for PR 21587 at commit 08da2e6.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 20, 2018

Test build #92123 has finished for PR 21587 at commit 08da2e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2018

Test build #92177 has finished for PR 21587 at commit 72466b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ClusteredDistribution(

@gatorsmile
Copy link
Member

LGTM

Thanks! Merged to master/2.3

asfgit pushed a commit that referenced this pull request Jun 21, 2018
…ning from children

## What changes were proposed in this pull request?

In #19080 we simplified the distribution/partitioning framework, and make all the join-like operators require `HashClusteredDistribution` from children. Unfortunately streaming join operator was missed.

This can cause wrong result. Think about
```
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
```

The physical plan is
```
*(3) Project [a#5]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6]
   :     +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(b#11, 5)
      +- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11]
         +- StreamingRelation MemoryStream[value#3], [value#3]
```

The left table is hash partitioned by `a, b`, while the right table is hash partitioned by `b`. This means, we may have a matching record that is in different partitions, which should be in the output but not.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes #21587 from cloud-fan/join.

(cherry picked from commit dc8a6be)
Signed-off-by: Xiao Li <[email protected]>
@asfgit asfgit closed this in dc8a6be Jun 21, 2018
cloud-fan pushed a commit that referenced this pull request Jun 9, 2021
### What changes were proposed in this pull request?

The changed [unit test](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala#L566) was introduce in #21587, to fix the planner side of thing for stream-stream join. Ideally check the query result should catch the bug, but it would be better to add plan check to make the purpose of unit test more clearly and catch future bug from planner change.

### Why are the changes needed?

Improve unit test.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Changed test itself.

Closes #32836 from c21/ss-test.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@HeartSaVioR
Copy link
Contributor

Retrospect: we have to use HashClusteredPartitioning for all stateful operators and we only addressed stream-stream join here and missed others. (Even I reviewed this PR before.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants