Skip to content

Commit 9c87892

Browse files
committed
[SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in SMJ
This patches renames `RowOrdering` to `InterpretedOrdering` and updates SortMergeJoin to use the `SparkPlan` methods for constructing its ordering so that it may benefit from codegen. This is an updated version of #7408. Author: Josh Rosen <[email protected]> Closes #7973 from JoshRosen/SPARK-9054 and squashes the following commits: e610655 [Josh Rosen] Add comment RE: Ascending ordering 34b8e0c [Josh Rosen] Import ordering be19a0f [Josh Rosen] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places.
1 parent dac090d commit 9c87892

File tree

12 files changed

+55
-36
lines changed

12 files changed

+55
-36
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic {
320320

321321
override def nullable: Boolean = left.nullable && right.nullable
322322

323-
private lazy val ordering = TypeUtils.getOrdering(dataType)
323+
private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
324324

325325
override def eval(input: InternalRow): Any = {
326326
val input1 = left.eval(input)
@@ -374,7 +374,7 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic {
374374

375375
override def nullable: Boolean = left.nullable && right.nullable
376376

377-
private lazy val ordering = TypeUtils.getOrdering(dataType)
377+
private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
378378

379379
override def eval(input: InternalRow): Any = {
380380
val input1 = left.eval(input)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ case class Least(children: Seq[Expression]) extends Expression {
319319
override def nullable: Boolean = children.forall(_.nullable)
320320
override def foldable: Boolean = children.forall(_.foldable)
321321

322-
private lazy val ordering = TypeUtils.getOrdering(dataType)
322+
private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
323323

324324
override def checkInputDataTypes(): TypeCheckResult = {
325325
if (children.length <= 1) {
@@ -374,7 +374,7 @@ case class Greatest(children: Seq[Expression]) extends Expression {
374374
override def nullable: Boolean = children.forall(_.nullable)
375375
override def foldable: Boolean = children.forall(_.foldable)
376376

377-
private lazy val ordering = TypeUtils.getOrdering(dataType)
377+
private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
378378

379379
override def checkInputDataTypes(): TypeCheckResult = {
380380
if (children.length <= 1) {
Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types._
2424
/**
2525
* An interpreted row ordering comparator.
2626
*/
27-
class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
27+
class InterpretedOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
2828

2929
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
3030
this(ordering.map(BindReferences.bindReference(_, inputSchema)))
@@ -49,9 +49,9 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
4949
case dt: AtomicType if order.direction == Descending =>
5050
dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
5151
case s: StructType if order.direction == Ascending =>
52-
s.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
52+
s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
5353
case s: StructType if order.direction == Descending =>
54-
s.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
54+
s.interpretedOrdering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
5555
case other =>
5656
throw new IllegalArgumentException(s"Type $other does not support ordered operations")
5757
}
@@ -65,6 +65,18 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
6565
}
6666
}
6767

68+
object InterpretedOrdering {
69+
70+
/**
71+
* Creates a [[InterpretedOrdering]] for the given schema, in natural ascending order.
72+
*/
73+
def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
74+
new InterpretedOrdering(dataTypes.zipWithIndex.map {
75+
case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
76+
})
77+
}
78+
}
79+
6880
object RowOrdering {
6981

7082
/**
@@ -81,13 +93,4 @@ object RowOrdering {
8193
* Returns true iff outputs from the expressions can be ordered.
8294
*/
8395
def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))
84-
85-
/**
86-
* Creates a [[RowOrdering]] for the given schema, in natural ascending order.
87-
*/
88-
def forSchema(dataTypes: Seq[DataType]): RowOrdering = {
89-
new RowOrdering(dataTypes.zipWithIndex.map {
90-
case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
91-
})
92-
}
9396
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ case class LessThan(left: Expression, right: Expression) extends BinaryCompariso
376376

377377
override def symbol: String = "<"
378378

379-
private lazy val ordering = TypeUtils.getOrdering(left.dataType)
379+
private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
380380

381381
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lt(input1, input2)
382382
}
@@ -388,7 +388,7 @@ case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryCo
388388

389389
override def symbol: String = "<="
390390

391-
private lazy val ordering = TypeUtils.getOrdering(left.dataType)
391+
private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
392392

393393
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lteq(input1, input2)
394394
}
@@ -400,7 +400,7 @@ case class GreaterThan(left: Expression, right: Expression) extends BinaryCompar
400400

401401
override def symbol: String = ">"
402402

403-
private lazy val ordering = TypeUtils.getOrdering(left.dataType)
403+
private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
404404

405405
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gt(input1, input2)
406406
}
@@ -412,7 +412,7 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends Binar
412412

413413
override def symbol: String = ">="
414414

415-
private lazy val ordering = TypeUtils.getOrdering(left.dataType)
415+
private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
416416

417417
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gteq(input1, input2)
418418
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ object TypeUtils {
5454
def getNumeric(t: DataType): Numeric[Any] =
5555
t.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]]
5656

57-
def getOrdering(t: DataType): Ordering[Any] = {
57+
def getInterpretedOrdering(t: DataType): Ordering[Any] = {
5858
t match {
5959
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
60-
case s: StructType => s.ordering.asInstanceOf[Ordering[Any]]
60+
case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
6161
}
6262
}
6363

sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.json4s.JsonDSL._
2424

2525
import org.apache.spark.SparkException
2626
import org.apache.spark.annotation.DeveloperApi
27-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, RowOrdering}
27+
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, AttributeReference, Attribute, InterpretedOrdering$}
2828

2929

3030
/**
@@ -301,7 +301,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
301301
StructType(newFields)
302302
}
303303

304-
private[sql] val ordering = RowOrdering.forSchema(this.fields.map(_.dataType))
304+
private[sql] val interpretedOrdering = InterpretedOrdering.forSchema(this.fields.map(_.dataType))
305305
}
306306

307307
object StructType extends AbstractDataType {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
5454
// GenerateOrdering agrees with RowOrdering.
5555
(DataTypeTestUtils.atomicTypes ++ Set(NullType)).foreach { dataType =>
5656
test(s"GenerateOrdering with $dataType") {
57-
val rowOrdering = RowOrdering.forSchema(Seq(dataType, dataType))
57+
val rowOrdering = InterpretedOrdering.forSchema(Seq(dataType, dataType))
5858
val genOrdering = GenerateOrdering.generate(
5959
BoundReference(0, dataType, nullable = true).asc ::
6060
BoundReference(1, dataType, nullable = true).asc :: Nil)

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
156156
val mutablePair = new MutablePair[InternalRow, Null]()
157157
iter.map(row => mutablePair.update(row.copy(), null))
158158
}
159-
implicit val ordering = new RowOrdering(sortingExpressions, child.output)
159+
// We need to use an interpreted ordering here because generated orderings cannot be
160+
// serialized and this ordering needs to be created on the driver in order to be passed into
161+
// Spark core code.
162+
implicit val ordering = new InterpretedOrdering(sortingExpressions, child.output)
160163
new RangePartitioner(numPartitions, rddForSampling, ascending = true)
161164
case SinglePartition =>
162165
new Partitioner {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.sql.Row
3232
import org.apache.spark.sql.catalyst.expressions.codegen._
3333
import org.apache.spark.sql.catalyst.plans.QueryPlan
3434
import org.apache.spark.sql.catalyst.plans.physical._
35+
import org.apache.spark.sql.types.DataType
3536

3637
object SparkPlan {
3738
protected[sql] val currentContext = new ThreadLocal[SQLContext]()
@@ -309,13 +310,22 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
309310
throw e
310311
} else {
311312
log.error("Failed to generate ordering, fallback to interpreted", e)
312-
new RowOrdering(order, inputSchema)
313+
new InterpretedOrdering(order, inputSchema)
313314
}
314315
}
315316
} else {
316-
new RowOrdering(order, inputSchema)
317+
new InterpretedOrdering(order, inputSchema)
317318
}
318319
}
320+
/**
321+
* Creates a row ordering for the given schema, in natural ascending order.
322+
*/
323+
protected def newNaturalAscendingOrdering(dataTypes: Seq[DataType]): Ordering[InternalRow] = {
324+
val order: Seq[SortOrder] = dataTypes.zipWithIndex.map {
325+
case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
326+
}
327+
newOrdering(order, Seq.empty)
328+
}
319329
}
320330

321331
private[sql] trait LeafNode extends SparkPlan {

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,9 @@ case class TakeOrderedAndProject(
212212

213213
override def outputPartitioning: Partitioning = SinglePartition
214214

215-
private val ord: RowOrdering = new RowOrdering(sortOrder, child.output)
215+
// We need to use an interpreted ordering here because generated orderings cannot be serialized
216+
// and this ordering needs to be created on the driver in order to be passed into Spark core code.
217+
private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
216218

217219
// TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
218220
@transient private val projection = projectList.map(new InterpretedProjection(_, child.output))

0 commit comments

Comments
 (0)