Skip to content

Commit a1d417b

Browse files
committed
Always use nullSafe version partitioning when creating Exchanges.
1 parent fce9053 commit a1d417b

File tree

3 files changed

+56
-1
lines changed

3 files changed

+56
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ sealed trait Partitioning {
9898
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]]
9999
* guarantees the same partitioning scheme described by `other`.
100100
*
101+
* If a [[Partitioning]] supports `nullSafe` setting, the nullSafe version of this
102+
* [[Partitioning]] should always `guarantees` its nullUnsafe version.
101103
* For example, HashPartitioning(expressions = 'a, numPartitions = 10, nullSafe = true)
102104
* guarantees HashPartitioning(expressions = 'a, numPartitions = 10, nullSafe = false).
103105
* However, HashPartitioning(expressions = 'a, numPartitions = 10, nullSafe = false) does not

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,12 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
213213

214214
def addShuffleIfNecessary(child: SparkPlan): SparkPlan = {
215215
if (!child.outputPartitioning.guarantees(partitioning)) {
216-
Exchange(partitioning, child)
216+
// If the child's outputPartitioning does not guarantees partitioning,
217+
// we need to add an Exchange operator. At here, we always use
218+
// the nullSafe version of the given partitioning because the nullSafe
219+
// version always guarantees the nullUnsafe version of the partitioning and
220+
// we do not have any special handling for nullUnsafe partitioning for now.
221+
Exchange(partitioning.withNullSafeSetting(newNullSafe = true), child)
217222
} else {
218223
child
219224
}

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,54 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
210210
}
211211
i += 1
212212
}
213+
214+
{
215+
val numExchanges: Int = sql(
216+
s"""
217+
|SELECT small.key, count(*)
218+
|FROM
219+
| normal JOIN small ON (normal.key = small.key)
220+
| JOIN tiny ON (small.key = tiny.key)
221+
|GROUP BY
222+
| small.key
223+
""".stripMargin
224+
).queryExecution.executedPlan.collect {
225+
case exchange: Exchange => exchange
226+
}.length
227+
assert(numExchanges === 3)
228+
}
229+
230+
{
231+
val numExchanges: Int = sql(
232+
s"""
233+
|SELECT normal.key, count(*)
234+
|FROM
235+
| normal LEFT OUTER JOIN small ON (normal.key = small.key)
236+
| JOIN tiny ON (small.key = tiny.key)
237+
|GROUP BY
238+
| normal.key
239+
""".stripMargin
240+
).queryExecution.executedPlan.collect {
241+
case exchange: Exchange => exchange
242+
}.length
243+
assert(numExchanges === 3)
244+
}
245+
246+
{
247+
val numExchanges: Int = sql(
248+
s"""
249+
|SELECT small.key, count(*)
250+
|FROM
251+
| normal LEFT OUTER JOIN small ON (normal.key = small.key)
252+
| JOIN tiny ON (small.key = tiny.key)
253+
|GROUP BY
254+
| small.key
255+
""".stripMargin
256+
).queryExecution.executedPlan.collect {
257+
case exchange: Exchange => exchange
258+
}.length
259+
assert(numExchanges === 4)
260+
}
213261
}
214262
}
215263
}

0 commit comments

Comments
 (0)