Skip to content

Commit be19a0f

Browse files
committed
[SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places.
1 parent eb5b8f4 commit be19a0f

File tree

12 files changed

+53
-35
lines changed

12 files changed

+53
-35
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic {
320320

321321
override def nullable: Boolean = left.nullable && right.nullable
322322

323-
private lazy val ordering = TypeUtils.getOrdering(dataType)
323+
private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
324324

325325
override def eval(input: InternalRow): Any = {
326326
val input1 = left.eval(input)
@@ -374,7 +374,7 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic {
374374

375375
override def nullable: Boolean = left.nullable && right.nullable
376376

377-
private lazy val ordering = TypeUtils.getOrdering(dataType)
377+
private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
378378

379379
override def eval(input: InternalRow): Any = {
380380
val input1 = left.eval(input)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ case class Least(children: Seq[Expression]) extends Expression {
319319
override def nullable: Boolean = children.forall(_.nullable)
320320
override def foldable: Boolean = children.forall(_.foldable)
321321

322-
private lazy val ordering = TypeUtils.getOrdering(dataType)
322+
private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
323323

324324
override def checkInputDataTypes(): TypeCheckResult = {
325325
if (children.length <= 1) {
@@ -374,7 +374,7 @@ case class Greatest(children: Seq[Expression]) extends Expression {
374374
override def nullable: Boolean = children.forall(_.nullable)
375375
override def foldable: Boolean = children.forall(_.foldable)
376376

377-
private lazy val ordering = TypeUtils.getOrdering(dataType)
377+
private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
378378

379379
override def checkInputDataTypes(): TypeCheckResult = {
380380
if (children.length <= 1) {
Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types._
2424
/**
2525
* An interpreted row ordering comparator.
2626
*/
27-
class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
27+
class InterpretedOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
2828

2929
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
3030
this(ordering.map(BindReferences.bindReference(_, inputSchema)))
@@ -49,9 +49,9 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
4949
case dt: AtomicType if order.direction == Descending =>
5050
dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
5151
case s: StructType if order.direction == Ascending =>
52-
s.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
52+
s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
5353
case s: StructType if order.direction == Descending =>
54-
s.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
54+
s.interpretedOrdering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
5555
case other =>
5656
throw new IllegalArgumentException(s"Type $other does not support ordered operations")
5757
}
@@ -65,6 +65,18 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
6565
}
6666
}
6767

68+
object InterpretedOrdering {
69+
70+
/**
71+
* Creates a [[InterpretedOrdering]] for the given schema, in natural ascending order.
72+
*/
73+
def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
74+
new InterpretedOrdering(dataTypes.zipWithIndex.map {
75+
case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
76+
})
77+
}
78+
}
79+
6880
object RowOrdering {
6981

7082
/**
@@ -81,13 +93,4 @@ object RowOrdering {
8193
* Returns true iff outputs from the expressions can be ordered.
8294
*/
8395
def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))
84-
85-
/**
86-
* Creates a [[RowOrdering]] for the given schema, in natural ascending order.
87-
*/
88-
def forSchema(dataTypes: Seq[DataType]): RowOrdering = {
89-
new RowOrdering(dataTypes.zipWithIndex.map {
90-
case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
91-
})
92-
}
9396
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ case class LessThan(left: Expression, right: Expression) extends BinaryCompariso
376376

377377
override def symbol: String = "<"
378378

379-
private lazy val ordering = TypeUtils.getOrdering(left.dataType)
379+
private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
380380

381381
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lt(input1, input2)
382382
}
@@ -388,7 +388,7 @@ case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryCo
388388

389389
override def symbol: String = "<="
390390

391-
private lazy val ordering = TypeUtils.getOrdering(left.dataType)
391+
private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
392392

393393
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lteq(input1, input2)
394394
}
@@ -400,7 +400,7 @@ case class GreaterThan(left: Expression, right: Expression) extends BinaryCompar
400400

401401
override def symbol: String = ">"
402402

403-
private lazy val ordering = TypeUtils.getOrdering(left.dataType)
403+
private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
404404

405405
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gt(input1, input2)
406406
}
@@ -412,7 +412,7 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends Binar
412412

413413
override def symbol: String = ">="
414414

415-
private lazy val ordering = TypeUtils.getOrdering(left.dataType)
415+
private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
416416

417417
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gteq(input1, input2)
418418
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ object TypeUtils {
5454
def getNumeric(t: DataType): Numeric[Any] =
5555
t.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]]
5656

57-
def getOrdering(t: DataType): Ordering[Any] = {
57+
def getInterpretedOrdering(t: DataType): Ordering[Any] = {
5858
t match {
5959
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
60-
case s: StructType => s.ordering.asInstanceOf[Ordering[Any]]
60+
case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
6161
}
6262
}
6363

sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.json4s.JsonDSL._
2424

2525
import org.apache.spark.SparkException
2626
import org.apache.spark.annotation.DeveloperApi
27-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, RowOrdering}
27+
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, AttributeReference, Attribute, InterpretedOrdering$}
2828

2929

3030
/**
@@ -301,7 +301,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
301301
StructType(newFields)
302302
}
303303

304-
private[sql] val ordering = RowOrdering.forSchema(this.fields.map(_.dataType))
304+
private[sql] val interpretedOrdering = InterpretedOrdering.forSchema(this.fields.map(_.dataType))
305305
}
306306

307307
object StructType extends AbstractDataType {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
5454
// GenerateOrdering agrees with RowOrdering.
5555
(DataTypeTestUtils.atomicTypes ++ Set(NullType)).foreach { dataType =>
5656
test(s"GenerateOrdering with $dataType") {
57-
val rowOrdering = RowOrdering.forSchema(Seq(dataType, dataType))
57+
val rowOrdering = InterpretedOrdering.forSchema(Seq(dataType, dataType))
5858
val genOrdering = GenerateOrdering.generate(
5959
BoundReference(0, dataType, nullable = true).asc ::
6060
BoundReference(1, dataType, nullable = true).asc :: Nil)

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
156156
val mutablePair = new MutablePair[InternalRow, Null]()
157157
iter.map(row => mutablePair.update(row.copy(), null))
158158
}
159-
implicit val ordering = new RowOrdering(sortingExpressions, child.output)
159+
// We need to use an interpreted ordering here because generated orderings cannot be
160+
// serialized and this ordering needs to be created on the driver in order to be passed into
161+
// Spark core code.
162+
implicit val ordering = new InterpretedOrdering(sortingExpressions, child.output)
160163
new RangePartitioner(numPartitions, rddForSampling, ascending = true)
161164
case SinglePartition =>
162165
new Partitioner {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution
1919

2020
import java.util.concurrent.atomic.AtomicBoolean
2121

22+
import org.apache.spark.sql.types.DataType
23+
2224
import scala.collection.mutable.ArrayBuffer
2325

2426
import org.apache.spark.{Accumulator, Logging}
@@ -309,12 +311,21 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
309311
throw e
310312
} else {
311313
log.error("Failed to generate ordering, fallback to interpreted", e)
312-
new RowOrdering(order, inputSchema)
314+
new InterpretedOrdering(order, inputSchema)
313315
}
314316
}
315317
} else {
316-
new RowOrdering(order, inputSchema)
318+
new InterpretedOrdering(order, inputSchema)
319+
}
320+
}
321+
/**
322+
* Creates a row ordering for the given schema, in natural ascending order.
323+
*/
324+
protected def newNaturalAscendingOrdering(dataTypes: Seq[DataType]): Ordering[InternalRow] = {
325+
val order: Seq[SortOrder] = dataTypes.zipWithIndex.map {
326+
case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
317327
}
328+
newOrdering(order, Seq.empty)
318329
}
319330
}
320331

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,9 @@ case class TakeOrderedAndProject(
212212

213213
override def outputPartitioning: Partitioning = SinglePartition
214214

215-
private val ord: RowOrdering = new RowOrdering(sortOrder, child.output)
215+
// We need to use an interpreted ordering here because generated orderings cannot be serialized
216+
// and this ordering needs to be created on the driver in order to be passed into Spark core code.
217+
private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
216218

217219
// TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
218220
@transient private val projection = projectList.map(new InterpretedProjection(_, child.output))

0 commit comments

Comments
 (0)