Skip to content

Commit 43cce92

Browse files
committed
[SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join
### What changes were proposed in this pull request? This PR revives `HashClusteredDistribution` and renames to `StatefulOpClusteredDistribution` so that the rationalization of the distribution is clear from the name. Renaming is safe because this class no longer needs to be general one - in SPARK-35703 we moved out the usages of `HashClusteredDistribution` to `ClusteredDistribution`; stateful operators are exceptions. Only `HashPartitioning` with same expressions and number of partitions can satisfy `StatefulOpClusteredDistribution`. That said, we cannot modify `HashPartitioning` unless we clone `HashPartitioning` and assign the clone to `StatefulOpClusteredDistribution`. This PR documents the expectation of stateful operator on partitioning in the classdoc of `StatefulOpClusteredDistribution`. This PR also changes stream-stream join to use `StatefulOpClusteredDistribution` instead of `ClusteredDistribution`. This effectively reverts a part of SPARK-35703 which hasn't been shipped to any releases. This PR doesn't deal with other stateful operators since it has been long standing issue (probably Spark 2.2.0+) and we need a plan for dealing with existing state. ### Why are the changes needed? Spark does not guarantee stable physical partitioning for stateful operators across query lifetime, and due to the relaxed distribution requirement it is hard to expect what would be the current physical partitioning of the state. (We expect hash partitioning with grouping keys, but ClusteredDistribution does not "guarantee" the partitioning. It is much more relaxed.) This PR will enforce the physical partitioning of stream-stream join operators to be hash partition with grouping keys, which is our general expectation of state store partitioning. ### Does this PR introduce _any_ user-facing change? No, since SPARK-35703 hasn't been shipped to any release yet. ### How was this patch tested? Existing tests. Closes #35419 from HeartSaVioR/SPARK-38124. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 5b02a34 commit 43cce92

File tree

3 files changed

+43
-3
lines changed

3 files changed

+43
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,37 @@ case class ClusteredDistribution(
9090
}
9191
}
9292

93+
/**
94+
* Represents the requirement of distribution on the stateful operator in Structured Streaming.
95+
*
96+
* Each partition in stateful operator initializes state store(s), which are independent with state
97+
* store(s) in other partitions. Since it is not possible to repartition the data in state store,
98+
* Spark should make sure the physical partitioning of the stateful operator is unchanged across
99+
* Spark versions. Violation of this requirement may bring silent correctness issue.
100+
*
101+
* Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
102+
* stateful operator, only [[HashPartitioning]] (and HashPartitioning in
103+
* [[PartitioningCollection]]) can satisfy this distribution.
104+
*/
105+
case class StatefulOpClusteredDistribution(
106+
expressions: Seq[Expression],
107+
_requiredNumPartitions: Int) extends Distribution {
108+
require(
109+
expressions != Nil,
110+
"The expressions for hash of a StatefulOpClusteredDistribution should not be Nil. " +
111+
"An AllTuples should be used to represent a distribution that only has " +
112+
"a single partition.")
113+
114+
override val requiredNumPartitions: Option[Int] = Some(_requiredNumPartitions)
115+
116+
override def createPartitioning(numPartitions: Int): Partitioning = {
117+
assert(_requiredNumPartitions == numPartitions,
118+
s"This StatefulOpClusteredDistribution requires ${_requiredNumPartitions} " +
119+
s"partitions, but the actual number of partitions is $numPartitions.")
120+
HashPartitioning(expressions, numPartitions)
121+
}
122+
}
123+
93124
/**
94125
* Represents data where tuples have been ordered according to the `ordering`
95126
* [[Expression Expressions]]. Its requirement is defined as the following:
@@ -200,6 +231,11 @@ case object SinglePartition extends Partitioning {
200231
* Represents a partitioning where rows are split up across partitions based on the hash
201232
* of `expressions`. All rows where `expressions` evaluate to the same values are guaranteed to be
202233
* in the same partition.
234+
*
235+
* Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
236+
* stateful operators to retain the same physical partitioning during the lifetime of the query
237+
* (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
238+
* across Spark versions. Violation of this requirement may bring silent correctness issue.
203239
*/
204240
case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
205241
extends Expression with Partitioning with Unevaluable {
@@ -211,6 +247,10 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
211247
override def satisfies0(required: Distribution): Boolean = {
212248
super.satisfies0(required) || {
213249
required match {
250+
case h: StatefulOpClusteredDistribution =>
251+
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
252+
case (l, r) => l.semanticEquals(r)
253+
}
214254
case ClusteredDistribution(requiredClustering, _) =>
215255
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
216256
case _ => false

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec(
185185
val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
186186

187187
override def requiredChildDistribution: Seq[Distribution] =
188-
ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
189-
ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
188+
StatefulOpClusteredDistribution(leftKeys, getStateInfo.numPartitions) ::
189+
StatefulOpClusteredDistribution(rightKeys, getStateInfo.numPartitions) :: Nil
190190

191191
override def output: Seq[Attribute] = joinType match {
192192
case _: InnerLike => left.output ++ right.output

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
571571
CheckNewAnswer((5, 10, 5, 15, 5, 25)))
572572
}
573573

574-
test("streaming join should require HashClusteredDistribution from children") {
574+
test("streaming join should require StatefulOpClusteredDistribution from children") {
575575
val input1 = MemoryStream[Int]
576576
val input2 = MemoryStream[Int]
577577

0 commit comments

Comments
 (0)