Skip to content

Commit 1462bba

Browse files
bersprocketshvanhovell
authored andcommitted
[SPARK-24119][SQL] Add interpreted execution to SortPrefix expression
## What changes were proposed in this pull request? Implemented eval in SortPrefix expression. ## How was this patch tested? - ran existing sbt SQL tests - added unit test - ran existing Python SQL tests - manual tests: disabling codegen -- patching code to disable beyond what spark.sql.codegen.wholeStage=false can do -- and running sbt SQL tests Author: Bruce Robbins <[email protected]> Closes #21231 from bersprockets/sortprefixeval.
1 parent e76b012 commit 1462bba

File tree

2 files changed

+131
-1
lines changed

2 files changed

+131
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
2222
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
2323
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
2424
import org.apache.spark.sql.types._
25+
import org.apache.spark.unsafe.types.UTF8String
2526
import org.apache.spark.util.collection.unsafe.sort.PrefixComparators._
2627

2728
abstract sealed class SortDirection {
@@ -148,7 +149,41 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression {
148149
(!child.isAscending && child.nullOrdering == NullsLast)
149150
}
150151

151-
override def eval(input: InternalRow): Any = throw new UnsupportedOperationException
152+
private lazy val calcPrefix: Any => Long = child.child.dataType match {
153+
case BooleanType => (raw) =>
154+
if (raw.asInstanceOf[Boolean]) 1 else 0
155+
case DateType | TimestampType | _: IntegralType => (raw) =>
156+
raw.asInstanceOf[java.lang.Number].longValue()
157+
case FloatType | DoubleType => (raw) => {
158+
val dVal = raw.asInstanceOf[java.lang.Number].doubleValue()
159+
DoublePrefixComparator.computePrefix(dVal)
160+
}
161+
case StringType => (raw) =>
162+
StringPrefixComparator.computePrefix(raw.asInstanceOf[UTF8String])
163+
case BinaryType => (raw) =>
164+
BinaryPrefixComparator.computePrefix(raw.asInstanceOf[Array[Byte]])
165+
case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS =>
166+
_.asInstanceOf[Decimal].toUnscaledLong
167+
case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>
168+
val p = Decimal.MAX_LONG_DIGITS
169+
val s = p - (dt.precision - dt.scale)
170+
(raw) => {
171+
val value = raw.asInstanceOf[Decimal]
172+
if (value.changePrecision(p, s)) value.toUnscaledLong else Long.MinValue
173+
}
174+
case dt: DecimalType => (raw) =>
175+
DoublePrefixComparator.computePrefix(raw.asInstanceOf[Decimal].toDouble)
176+
case _ => (Any) => 0L
177+
}
178+
179+
override def eval(input: InternalRow): Any = {
180+
val value = child.child.eval(input)
181+
if (value == null) {
182+
null
183+
} else {
184+
calcPrefix(value)
185+
}
186+
}
152187

153188
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
154189
val childCode = child.child.genCode(ctx)
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import java.sql.{Date, Timestamp}
21+
import java.util.TimeZone
22+
23+
import org.apache.spark.SparkFunSuite
24+
import org.apache.spark.sql.types._
25+
import org.apache.spark.unsafe.types.UTF8String
26+
import org.apache.spark.util.collection.unsafe.sort.PrefixComparators._
27+
28+
class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
29+
30+
test("SortPrefix") {
31+
val b1 = Literal.create(false, BooleanType)
32+
val b2 = Literal.create(true, BooleanType)
33+
val i1 = Literal.create(20132983, IntegerType)
34+
val i2 = Literal.create(-20132983, IntegerType)
35+
val l1 = Literal.create(20132983, LongType)
36+
val l2 = Literal.create(-20132983, LongType)
37+
val millis = 1524954911000L;
38+
// Explicitly choose a time zone, since Date objects can create different values depending on
39+
// local time zone of the machine on which the test is running
40+
val oldDefaultTZ = TimeZone.getDefault
41+
val d1 = try {
42+
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
43+
Literal.create(new java.sql.Date(millis), DateType)
44+
} finally {
45+
TimeZone.setDefault(oldDefaultTZ)
46+
}
47+
val t1 = Literal.create(new Timestamp(millis), TimestampType)
48+
val f1 = Literal.create(0.7788229f, FloatType)
49+
val f2 = Literal.create(-0.7788229f, FloatType)
50+
val db1 = Literal.create(0.7788229d, DoubleType)
51+
val db2 = Literal.create(-0.7788229d, DoubleType)
52+
val s1 = Literal.create("T", StringType)
53+
val s2 = Literal.create("This is longer than 8 characters", StringType)
54+
val bin1 = Literal.create(Array[Byte](12), BinaryType)
55+
val bin2 = Literal.create(Array[Byte](12, 17, 99, 0, 0, 0, 2, 3, 0xf4.asInstanceOf[Byte]),
56+
BinaryType)
57+
val dec1 = Literal(Decimal(20132983L, 10, 2))
58+
val dec2 = Literal(Decimal(20132983L, 19, 2))
59+
val dec3 = Literal(Decimal(20132983L, 21, 2))
60+
val list1 = Literal(List(1, 2), ArrayType(IntegerType))
61+
val nullVal = Literal.create(null, IntegerType)
62+
63+
checkEvaluation(SortPrefix(SortOrder(b1, Ascending)), 0L)
64+
checkEvaluation(SortPrefix(SortOrder(b2, Ascending)), 1L)
65+
checkEvaluation(SortPrefix(SortOrder(i1, Ascending)), 20132983L)
66+
checkEvaluation(SortPrefix(SortOrder(i2, Ascending)), -20132983L)
67+
checkEvaluation(SortPrefix(SortOrder(l1, Ascending)), 20132983L)
68+
checkEvaluation(SortPrefix(SortOrder(l2, Ascending)), -20132983L)
69+
// For some reason, the Literal.create code gives us the number of days since the epoch
70+
checkEvaluation(SortPrefix(SortOrder(d1, Ascending)), 17649L)
71+
checkEvaluation(SortPrefix(SortOrder(t1, Ascending)), millis * 1000)
72+
checkEvaluation(SortPrefix(SortOrder(f1, Ascending)),
73+
DoublePrefixComparator.computePrefix(f1.value.asInstanceOf[Float].toDouble))
74+
checkEvaluation(SortPrefix(SortOrder(f2, Ascending)),
75+
DoublePrefixComparator.computePrefix(f2.value.asInstanceOf[Float].toDouble))
76+
checkEvaluation(SortPrefix(SortOrder(db1, Ascending)),
77+
DoublePrefixComparator.computePrefix(db1.value.asInstanceOf[Double]))
78+
checkEvaluation(SortPrefix(SortOrder(db2, Ascending)),
79+
DoublePrefixComparator.computePrefix(db2.value.asInstanceOf[Double]))
80+
checkEvaluation(SortPrefix(SortOrder(s1, Ascending)),
81+
StringPrefixComparator.computePrefix(s1.value.asInstanceOf[UTF8String]))
82+
checkEvaluation(SortPrefix(SortOrder(s2, Ascending)),
83+
StringPrefixComparator.computePrefix(s2.value.asInstanceOf[UTF8String]))
84+
checkEvaluation(SortPrefix(SortOrder(bin1, Ascending)),
85+
BinaryPrefixComparator.computePrefix(bin1.value.asInstanceOf[Array[Byte]]))
86+
checkEvaluation(SortPrefix(SortOrder(bin2, Ascending)),
87+
BinaryPrefixComparator.computePrefix(bin2.value.asInstanceOf[Array[Byte]]))
88+
checkEvaluation(SortPrefix(SortOrder(dec1, Ascending)), 20132983L)
89+
checkEvaluation(SortPrefix(SortOrder(dec2, Ascending)), 2013298L)
90+
checkEvaluation(SortPrefix(SortOrder(dec3, Ascending)),
91+
DoublePrefixComparator.computePrefix(201329.83d))
92+
checkEvaluation(SortPrefix(SortOrder(list1, Ascending)), 0L)
93+
checkEvaluation(SortPrefix(SortOrder(nullVal, Ascending)), null)
94+
}
95+
}

0 commit comments

Comments
 (0)