Skip to content

Commit 1307c50

Browse files
committed
Update conditions for requiring child compatibility.
1 parent 18cddeb commit 1307c50

File tree

7 files changed

+6
-22
lines changed

7 files changed

+6
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,10 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
222222
* output partitionings and add Exchanges to fix any detected incompatibilities.
223223
*/
224224
private def ensureChildPartitioningsAreCompatible(operator: SparkPlan): SparkPlan = {
225-
if (operator.requiresChildPartitioningsToBeCompatible) {
225+
// If an operator has multiple children and the operator requires a specific child output
226+
// distribution then we need to ensure that all children have compatible output partitionings.
227+
if (operator.children.length > 1
228+
&& operator.requiredChildDistribution.toSet != Set(UnspecifiedDistribution)) {
226229
if (!Partitioning.allCompatible(operator.children.map(_.outputPartitioning))) {
227230
val newChildren = operator.children.zip(operator.requiredChildDistribution).map {
228231
case (child, requiredDistribution) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
124124
/** Specifies sort order for each partition requirements on the input data for this operator. */
125125
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
126126

127-
/**
128-
* Specifies whether this operator requires all of its children to have [[outputPartitioning]]s
129-
* that are compatible with each other.
130-
*/
131-
def requiresChildPartitioningsToBeCompatible: Boolean = false
132-
133127
/** Specifies whether this operator outputs UnsafeRows */
134128
def outputsUnsafeRows: Boolean = false
135129

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ case class LeftSemiJoinHash(
4242
override def requiredChildDistribution: Seq[Distribution] =
4343
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
4444

45-
override def requiresChildPartitioningsToBeCompatible: Boolean = true
46-
4745
protected override def doExecute(): RDD[InternalRow] = {
4846
right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) =>
4947
if (condition.isEmpty) {

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ case class ShuffledHashJoin(
4646
override def requiredChildDistribution: Seq[Distribution] =
4747
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
4848

49-
override def requiresChildPartitioningsToBeCompatible: Boolean = true
50-
5149
protected override def doExecute(): RDD[InternalRow] = {
5250
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
5351
val hashed = HashedRelation(buildIter, buildSideKeyGenerator)

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ case class ShuffledHashOuterJoin(
4444
override def requiredChildDistribution: Seq[Distribution] =
4545
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
4646

47-
override def requiresChildPartitioningsToBeCompatible: Boolean = true
48-
4947
override def outputPartitioning: Partitioning = joinType match {
5048
case LeftOuter => left.outputPartitioning
5149
case RightOuter => right.outputPartitioning

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ case class SortMergeJoin(
4848
override def requiredChildDistribution: Seq[Distribution] =
4949
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
5050

51-
override def requiresChildPartitioningsToBeCompatible: Boolean = true
52-
5351
override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
5452

5553
override def requiredChildOrdering: Seq[Seq[SortOrder]] =

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
214214
// do they satisfy the distribution requirements? As a result, we need at least four test cases.
215215

216216
private def assertDistributionRequirementsAreSatisfied(outputPlan: SparkPlan): Unit = {
217-
if (outputPlan.requiresChildPartitioningsToBeCompatible) {
217+
if (outputPlan.children.length > 1
218+
&& outputPlan.requiredChildDistribution.toSet != Set(UnspecifiedDistribution)) {
218219
val childPartitionings = outputPlan.children.map(_.outputPartitioning)
219220
if (!Partitioning.allCompatible(childPartitionings)) {
220221
fail(s"Partitionings are not compatible: $childPartitionings")
@@ -248,7 +249,6 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
248249
DummySparkPlan(outputPartitioning = leftPartitioning),
249250
DummySparkPlan(outputPartitioning = rightPartitioning)
250251
),
251-
requiresChildPartitioningsToBeCompatible = true,
252252
requiredChildDistribution = Seq(distribution, distribution),
253253
requiredChildOrdering = Seq(Seq.empty, Seq.empty)
254254
)
@@ -269,7 +269,6 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
269269
DummySparkPlan(outputPartitioning = HashPartitioning(clustering, 1)),
270270
DummySparkPlan(outputPartitioning = HashPartitioning(clustering, 2))
271271
),
272-
requiresChildPartitioningsToBeCompatible = true,
273272
requiredChildDistribution = Seq(distribution, distribution),
274273
requiredChildOrdering = Seq(Seq.empty, Seq.empty)
275274
)
@@ -288,7 +287,6 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
288287
DummySparkPlan(outputPartitioning = childPartitioning),
289288
DummySparkPlan(outputPartitioning = childPartitioning)
290289
),
291-
requiresChildPartitioningsToBeCompatible = true,
292290
requiredChildDistribution = Seq(distribution, distribution),
293291
requiredChildOrdering = Seq(Seq.empty, Seq.empty)
294292
)
@@ -309,7 +307,6 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
309307
DummySparkPlan(outputPartitioning = childPartitioning),
310308
DummySparkPlan(outputPartitioning = childPartitioning)
311309
),
312-
requiresChildPartitioningsToBeCompatible = true,
313310
requiredChildDistribution = Seq(distribution, distribution),
314311
requiredChildOrdering = Seq(Seq.empty, Seq.empty)
315312
)
@@ -333,7 +330,6 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
333330
DummySparkPlan(outputPartitioning = SinglePartition),
334331
DummySparkPlan(outputPartitioning = SinglePartition)
335332
),
336-
requiresChildPartitioningsToBeCompatible = true,
337333
requiredChildDistribution = Seq(distribution, distribution),
338334
requiredChildOrdering = Seq(outputOrdering, outputOrdering)
339335
)
@@ -352,7 +348,6 @@ private case class DummySparkPlan(
352348
override val children: Seq[SparkPlan] = Nil,
353349
override val outputOrdering: Seq[SortOrder] = Nil,
354350
override val outputPartitioning: Partitioning = UnknownPartitioning(0),
355-
override val requiresChildPartitioningsToBeCompatible: Boolean = false,
356351
override val requiredChildDistribution: Seq[Distribution] = Nil,
357352
override val requiredChildOrdering: Seq[Seq[SortOrder]] = Nil
358353
) extends SparkPlan {

0 commit comments

Comments
 (0)