Skip to content

Commit 6e897dd

Browse files
committed
hide boundReference from manually construct RowOrdering for key compare in smj
1 parent 8681d73 commit 6e897dd

File tree

2 files changed

+9
-5
lines changed
  • sql
    • catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions
    • core/src/main/scala/org/apache/spark/sql/execution/joins

2 files changed

+9
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20-
import org.apache.spark.sql.types.{StructType, NativeType}
20+
import org.apache.spark.sql.types.{DataType, StructType, NativeType}
2121

2222

2323
/**
@@ -232,3 +232,10 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
232232
return 0
233233
}
234234
}
235+
236+
object RowOrdering {
237+
def getOrderingFromDataTypes(dataTypes: Seq[DataType]): RowOrdering =
238+
new RowOrdering(dataTypes.zipWithIndex.map {
239+
case(dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
240+
})
241+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,8 @@ case class SortMergeJoin(
4646
override def requiredChildDistribution: Seq[Distribution] =
4747
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
4848

49-
private val orders: Seq[SortOrder] = leftKeys.zipWithIndex.map {
50-
case(expr, index) => SortOrder(BoundReference(index, expr.dataType, expr.nullable), Ascending)
51-
}
5249
// this is to manually construct an ordering that can be used to compare keys from both sides
53-
private val keyOrdering: RowOrdering = new RowOrdering(orders)
50+
private val keyOrdering: RowOrdering = RowOrdering.getOrderingFromDataTypes(leftKeys.map(_.dataType))
5451

5552
private def requiredOrders(keys: Seq[Expression], side: SparkPlan): Ordering[Row] =
5653
newOrdering(keys.map(SortOrder(_, Ascending)), side.output)

0 commit comments

Comments
 (0)