Skip to content

Commit 4f08278

Browse files
committed
Fix the test by adding the compatibility check to EnsureRequirements
1 parent a1c12b9 commit 4f08278

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,9 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
212212
}
213213
}
214214

215-
private def ensureChildNumPartitionsAgreementIfNecessary(operator: SparkPlan): SparkPlan = {
215+
private def ensureChildPartitioningsAreCompatible(operator: SparkPlan): SparkPlan = {
216216
if (operator.requiresChildPartitioningsToBeCompatible) {
217-
if (operator.children.map(_.outputPartitioning.numPartitions).distinct.size > 1) {
217+
if (!Partitioning.allCompatible(operator.children.map(_.outputPartitioning))) {
218218
val newChildren = operator.children.zip(operator.requiredChildDistribution).map {
219219
case (child, requiredDistribution) =>
220220
val targetPartitioning = canonicalPartitioning(requiredDistribution)
@@ -271,6 +271,6 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
271271

272272
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
273273
case operator: SparkPlan =>
274-
ensureDistributionAndOrdering(ensureChildNumPartitionsAgreementIfNecessary(operator))
274+
ensureDistributionAndOrdering(ensureChildPartitioningsAreCompatible(operator))
275275
}
276276
}

0 commit comments

Comments
 (0)