From 69a9d8aacb3832f3e346a7b9dbb13dc522e4dc68 Mon Sep 17 00:00:00 2001 From: Xin Wu Date: Wed, 14 Sep 2016 21:12:43 -0700 Subject: [PATCH 1/2] complete the NULL ordering support in DataFrame APIs --- .../scala/org/apache/spark/sql/Column.scala | 31 +++++++++++++++++ .../org/apache/spark/sql/functions.scala | 26 ++++++++++++++ .../org/apache/spark/sql/DataFrameSuite.scala | 34 +++++++++++++++++++ .../apache/spark/sql/test/SQLTestData.scala | 15 ++++++++ 4 files changed, 106 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 844ca7a8e99c..e03a2d37b9b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -1019,6 +1019,21 @@ class Column(protected[sql] val expr: Expression) extends Logging { */ def desc: Column = withExpr { SortOrder(expr, Descending) } + /** + * Returns an ordering used in sorting. + * {{{ + * // Scala: sort a DataFrame by age column in descending order with NULLS FIRST. + * df.sort(df("age").desc_nulls_first) + * + * // Java + * df.sort(df.col("age").desc_nulls_first()); + * }}} + * + * @group expr_ops + * @since 2.1.0 + */ + def desc_nulls_first: Column = withExpr { SortOrder(expr, Descending, NullsFirst) } + /** * Returns an ordering used in sorting. * {{{ @@ -1034,6 +1049,22 @@ class Column(protected[sql] val expr: Expression) extends Logging { */ def asc: Column = withExpr { SortOrder(expr, Ascending) } + /** + * Returns an ordering used in sorting. + * {{{ + * // Scala: sort a DataFrame by age column in ascending order with NULLS LAST. + * df.sort(df("age").asc_nulls_last) + * + * // Java + * df.sort(df.col("age").asc_nulls_last()); + * }}} + * + * @group expr_ops + * @since 2.1.0 + */ + def asc_nulls_last: Column = withExpr { SortOrder(expr, Ascending, NullsLast) } + + /** * Prints the expression to the console for debugging purpose. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 18e736ab6986..cdb5b6f89a34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -118,6 +118,19 @@ object functions { */ def asc(columnName: String): Column = Column(columnName).asc + /** + * Returns a sort expression based on ascending order of the column with NULLS LAST. + * {{{ + * // Sort by dept in ascending order nulls first, and then age in descending order. + * df.sort(asc_nulls_last("dept"), desc("age")) + * }}} + * + * @group sort_funcs + * @since 2.1.0 + */ + def asc_nulls_last(columnName: String): Column = Column(columnName).asc_nulls_last + + /** * Returns a sort expression based on the descending order of the column. * {{{ @@ -130,6 +143,19 @@ object functions { */ def desc(columnName: String): Column = Column(columnName).desc + /** + * Returns a sort expression based on the descending order of the column with NULLS FIRST. + * {{{ + * // Sort by dept in ascending order, and then age in descending order NULLS FIRST. + * df.sort(asc("dept"), desc_nulls_first("age")) + * }}} + * + * @group sort_funcs + * @since 2.1.0 + */ + def desc_nulls_first(columnName: String): Column = Column(columnName).desc_nulls_first + + ////////////////////////////////////////////////////////////////////////////////////////////// // Aggregate functions ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c2d256bdd335..f3fcbfe383c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -325,6 +325,40 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row(6)) } + test("sorting with null ordering") { + checkAnswer( + nullableData.orderBy('a.asc_nulls_last, 'b.desc_nulls_first), + Seq( + Row(2, null), Row(2, "B"), Row(3, null), Row(4, "a"), + Row(5, "A"), Row(null, "c"), Row(null, "b") + ) + ) + + checkAnswer( + nullableData.orderBy(asc_nulls_last("a"), desc_nulls_first("b")), + Seq( + Row(2, null), Row(2, "B"), Row(3, null), Row(4, "a"), + Row(5, "A"), Row(null, "c"), Row(null, "b") + ) + ) + + checkAnswer( + nullableData.orderBy('a.desc_nulls_first, 'b.asc_nulls_last), + Seq( + Row(null, "b"), Row(null, "c"), Row(5, "A"), Row(4, "a"), + Row(3, null), Row(2, "B"), Row(2, null) + ) + ) + + checkAnswer( + nullableData.orderBy(desc_nulls_first("a"), asc_nulls_last("b")), + Seq( + Row(null, "b"), Row(null, "c"), Row(5, "A"), Row(4, "a"), + Row(3, null), Row(2, "B"), Row(2, null) + ) + ) + } + test("global sorting") { checkAnswer( testData2.orderBy('a.asc, 'b.asc), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala index 0cfe260e5215..ee31d5a739ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala @@ -169,6 +169,20 @@ private[sql] trait SQLTestData { self => rdd } + protected lazy val nullableData: DataFrame = { + val df = spark.sparkContext.parallelize( + NullableRecord(4, "a") :: + NullableRecord(null, "c") :: + NullableRecord(2, null) :: + NullableRecord(null, "b") :: + NullableRecord(3, null) :: + NullableRecord(5, "A") :: + NullableRecord(2, "B") :: Nil, 2 + ).toDF("a", "b") + df.createOrReplaceTempView("nullableData") + df + } + protected lazy val nullInts: DataFrame = { val df = spark.sparkContext.parallelize( NullInts(1) :: @@ -305,6 +319,7 @@ private[sql] object SQLTestData { case class IntField(i: Int) case class NullInts(a: Integer) case class NullStrings(n: Int, s: String) + case class NullableRecord(n: Integer, s: String) case class TableName(tableName: String) case class Person(id: Int, name: String, age: Int) case class Salary(personId: Int, salary: Double) From 02f5edf207e2d545a23f7c33d4f19ce8fc880041 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Fri, 16 Sep 2016 17:58:11 -0700 Subject: [PATCH 2/2] Updated. --- .../sql/catalyst/expressions/SortOrder.scala | 28 +++-------- .../codegen/GenerateOrdering.scala | 16 +++---- .../scala/org/apache/spark/sql/Column.scala | 43 ++++++++++++++--- .../org/apache/spark/sql/functions.scala | 35 +++++++++++--- ...dering.sql => order-by-nulls-ordering.sql} | 0 ...ql.out => order-by-nulls-ordering.sql.out} | 0 .../org/apache/spark/sql/DataFrameSuite.scala | 46 ++++++------------- .../apache/spark/sql/test/SQLTestData.scala | 15 ------ 8 files changed, 94 insertions(+), 89 deletions(-) rename sql/core/src/test/resources/sql-tests/inputs/{orderby-nulls-ordering.sql => order-by-nulls-ordering.sql} (100%) rename sql/core/src/test/resources/sql-tests/results/{orderby-nulls-ordering.sql.out => order-by-nulls-ordering.sql.out} (100%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index d015125bacca..3bebd552ef51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -54,10 +54,7 @@ case object NullsLast extends NullOrdering{ * An expression that can be used to sort a tuple. This class extends expression primarily so that * transformations over expression will descend into its child. */ -case class SortOrder( - child: Expression, - direction: SortDirection, - nullOrdering: NullOrdering) +case class SortOrder(child: Expression, direction: SortDirection, nullOrdering: NullOrdering) extends UnaryExpression with Unevaluable { /** Sort order is not foldable because we don't have an eval for it. */ @@ -94,17 +91,9 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { val nullValue = child.child.dataType match { case BooleanType | DateType | TimestampType | _: IntegralType => - if (nullAsSmallest) { - Long.MinValue - } else { - Long.MaxValue - } + if (nullAsSmallest) Long.MinValue else Long.MaxValue case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => - if (nullAsSmallest) { - Long.MinValue - } else { - Long.MaxValue - } + if (nullAsSmallest) Long.MinValue else Long.MaxValue case _: DecimalType => if (nullAsSmallest) { DoublePrefixComparator.computePrefix(Double.NegativeInfinity) @@ -112,16 +101,13 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { DoublePrefixComparator.computePrefix(Double.NaN) } case _ => - if (nullAsSmallest) { - 0L - } else { - -1L - } + if (nullAsSmallest) 0L else -1L } - private def nullAsSmallest: Boolean = (child.isAscending && child.nullOrdering == NullsFirst) || + private def nullAsSmallest: Boolean = { + (child.isAscending && child.nullOrdering == NullsFirst) || (!child.isAscending && child.nullOrdering == NullsLast) - + } override def eval(input: InternalRow): Any = throw new UnsupportedOperationException diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index e7df95e1142c..f1c30ef6c7fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -100,16 +100,16 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR // Nothing } else if ($isNullA) { return ${ - order.nullOrdering match { - case NullsFirst => "-1" - case NullsLast => "1" - }}; + order.nullOrdering match { + case NullsFirst => "-1" + case NullsLast => "1" + }}; } else if ($isNullB) { return ${ - order.nullOrdering match { - case NullsFirst => "1" - case NullsLast => "-1" - }}; + order.nullOrdering match { + case NullsFirst => "1" + case NullsLast => "-1" + }}; } else { int comp = ${ctx.genComp(order.child.dataType, primitiveA, primitiveB)}; if (comp != 0) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index e03a2d37b9b4..63da501f18cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -1007,7 +1007,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { /** * Returns an ordering used in sorting. * {{{ - * // Scala: sort a DataFrame by age column in descending order. + * // Scala * df.sort(df("age").desc) * * // Java @@ -1020,9 +1020,9 @@ class Column(protected[sql] val expr: Expression) extends Logging { def desc: Column = withExpr { SortOrder(expr, Descending) } /** - * Returns an ordering used in sorting. + * Returns a descending ordering used in sorting, where null values appear before non-null values. * {{{ - * // Scala: sort a DataFrame by age column in descending order with NULLS FIRST. + * // Scala: sort a DataFrame by age column in descending order and null values appearing first. * df.sort(df("age").desc_nulls_first) * * // Java @@ -1035,7 +1035,22 @@ class Column(protected[sql] val expr: Expression) extends Logging { def desc_nulls_first: Column = withExpr { SortOrder(expr, Descending, NullsFirst) } /** - * Returns an ordering used in sorting. + * Returns a descending ordering used in sorting, where null values appear after non-null values. + * {{{ + * // Scala: sort a DataFrame by age column in descending order and null values appearing last. + * df.sort(df("age").desc_nulls_last) + * + * // Java + * df.sort(df.col("age").desc_nulls_last()); + * }}} + * + * @group expr_ops + * @since 2.1.0 + */ + def desc_nulls_last: Column = withExpr { SortOrder(expr, Descending, NullsLast) } + + /** + * Returns an ascending ordering used in sorting. * {{{ * // Scala: sort a DataFrame by age column in ascending order. * df.sort(df("age").asc) @@ -1050,9 +1065,9 @@ class Column(protected[sql] val expr: Expression) extends Logging { def asc: Column = withExpr { SortOrder(expr, Ascending) } /** - * Returns an ordering used in sorting. + * Returns an ascending ordering used in sorting, where null values appear before non-null values. * {{{ - * // Scala: sort a DataFrame by age column in ascending order with NULLS LAST. + * // Scala: sort a DataFrame by age column in ascending order and null values appearing first. * df.sort(df("age").asc_nulls_last) * * // Java @@ -1062,8 +1077,22 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 2.1.0 */ - def asc_nulls_last: Column = withExpr { SortOrder(expr, Ascending, NullsLast) } + def asc_nulls_first: Column = withExpr { SortOrder(expr, Ascending, NullsFirst) } + /** + * Returns an ordering used in sorting, where null values appear after non-null values. + * {{{ + * // Scala: sort a DataFrame by age column in ascending order and null values appearing last. + * df.sort(df("age").asc_nulls_last) + * + * // Java + * df.sort(df.col("age").asc_nulls_last()); + * }}} + * + * @group expr_ops + * @since 2.1.0 + */ + def asc_nulls_last: Column = withExpr { SortOrder(expr, Ascending, NullsLast) } /** * Prints the expression to the console for debugging purpose. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index cdb5b6f89a34..9bdb2d3b7195 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -109,7 +109,6 @@ object functions { /** * Returns a sort expression based on ascending order of the column. * {{{ - * // Sort by dept in ascending order, and then age in descending order. * df.sort(asc("dept"), desc("age")) * }}} * @@ -119,22 +118,32 @@ object functions { def asc(columnName: String): Column = Column(columnName).asc /** - * Returns a sort expression based on ascending order of the column with NULLS LAST. + * Returns a sort expression based on ascending order of the column, + * and null values return before non-null values. * {{{ - * // Sort by dept in ascending order nulls first, and then age in descending order. * df.sort(asc_nulls_last("dept"), desc("age")) * }}} * * @group sort_funcs * @since 2.1.0 */ - def asc_nulls_last(columnName: String): Column = Column(columnName).asc_nulls_last + def asc_nulls_first(columnName: String): Column = Column(columnName).asc_nulls_first + /** + * Returns a sort expression based on ascending order of the column, + * and null values appear after non-null values. + * {{{ + * df.sort(asc_nulls_last("dept"), desc("age")) + * }}} + * + * @group sort_funcs + * @since 2.1.0 + */ + def asc_nulls_last(columnName: String): Column = Column(columnName).asc_nulls_last /** * Returns a sort expression based on the descending order of the column. * {{{ - * // Sort by dept in ascending order, and then age in descending order. * df.sort(asc("dept"), desc("age")) * }}} * @@ -144,9 +153,9 @@ object functions { def desc(columnName: String): Column = Column(columnName).desc /** - * Returns a sort expression based on the descending order of the column with NULLS FIRST. + * Returns a sort expression based on the descending order of the column, + * and null values appear before non-null values. * {{{ - * // Sort by dept in ascending order, and then age in descending order NULLS FIRST. * df.sort(asc("dept"), desc_nulls_first("age")) * }}} * @@ -155,6 +164,18 @@ object functions { */ def desc_nulls_first(columnName: String): Column = Column(columnName).desc_nulls_first + /** + * Returns a sort expression based on the descending order of the column, + * and null values appear after non-null values. + * {{{ + * df.sort(asc("dept"), desc_nulls_last("age")) + * }}} + * + * @group sort_funcs + * @since 2.1.0 + */ + def desc_nulls_last(columnName: String): Column = Column(columnName).desc_nulls_last + ////////////////////////////////////////////////////////////////////////////////////////////// // Aggregate functions diff --git a/sql/core/src/test/resources/sql-tests/inputs/orderby-nulls-ordering.sql b/sql/core/src/test/resources/sql-tests/inputs/order-by-nulls-ordering.sql similarity index 100% rename from sql/core/src/test/resources/sql-tests/inputs/orderby-nulls-ordering.sql rename to sql/core/src/test/resources/sql-tests/inputs/order-by-nulls-ordering.sql diff --git a/sql/core/src/test/resources/sql-tests/results/orderby-nulls-ordering.sql.out b/sql/core/src/test/resources/sql-tests/results/order-by-nulls-ordering.sql.out similarity index 100% rename from sql/core/src/test/resources/sql-tests/results/orderby-nulls-ordering.sql.out rename to sql/core/src/test/resources/sql-tests/results/order-by-nulls-ordering.sql.out diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f3fcbfe383c0..27962a566f6e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -326,37 +326,21 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("sorting with null ordering") { - checkAnswer( - nullableData.orderBy('a.asc_nulls_last, 'b.desc_nulls_first), - Seq( - Row(2, null), Row(2, "B"), Row(3, null), Row(4, "a"), - Row(5, "A"), Row(null, "c"), Row(null, "b") - ) - ) - - checkAnswer( - nullableData.orderBy(asc_nulls_last("a"), desc_nulls_first("b")), - Seq( - Row(2, null), Row(2, "B"), Row(3, null), Row(4, "a"), - Row(5, "A"), Row(null, "c"), Row(null, "b") - ) - ) - - checkAnswer( - nullableData.orderBy('a.desc_nulls_first, 'b.asc_nulls_last), - Seq( - Row(null, "b"), Row(null, "c"), Row(5, "A"), Row(4, "a"), - Row(3, null), Row(2, "B"), Row(2, null) - ) - ) - - checkAnswer( - nullableData.orderBy(desc_nulls_first("a"), asc_nulls_last("b")), - Seq( - Row(null, "b"), Row(null, "c"), Row(5, "A"), Row(4, "a"), - Row(3, null), Row(2, "B"), Row(2, null) - ) - ) + val data = Seq[java.lang.Integer](2, 1, null).toDF("key") + + checkAnswer(data.orderBy('key.asc), Row(null) :: Row(1) :: Row(2) :: Nil) + checkAnswer(data.orderBy(asc("key")), Row(null) :: Row(1) :: Row(2) :: Nil) + checkAnswer(data.orderBy('key.asc_nulls_first), Row(null) :: Row(1) :: Row(2) :: Nil) + checkAnswer(data.orderBy(asc_nulls_first("key")), Row(null) :: Row(1) :: Row(2) :: Nil) + checkAnswer(data.orderBy('key.asc_nulls_last), Row(1) :: Row(2) :: Row(null) :: Nil) + checkAnswer(data.orderBy(asc_nulls_last("key")), Row(1) :: Row(2) :: Row(null) :: Nil) + + checkAnswer(data.orderBy('key.desc), Row(2) :: Row(1) :: Row(null) :: Nil) + checkAnswer(data.orderBy(desc("key")), Row(2) :: Row(1) :: Row(null) :: Nil) + checkAnswer(data.orderBy('key.desc_nulls_first), Row(null) :: Row(2) :: Row(1) :: Nil) + checkAnswer(data.orderBy(desc_nulls_first("key")), Row(null) :: Row(2) :: Row(1) :: Nil) + checkAnswer(data.orderBy('key.desc_nulls_last), Row(2) :: Row(1) :: Row(null) :: Nil) + checkAnswer(data.orderBy(desc_nulls_last("key")), Row(2) :: Row(1) :: Row(null) :: Nil) } test("global sorting") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala index ee31d5a739ac..0cfe260e5215 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala @@ -169,20 +169,6 @@ private[sql] trait SQLTestData { self => rdd } - protected lazy val nullableData: DataFrame = { - val df = spark.sparkContext.parallelize( - NullableRecord(4, "a") :: - NullableRecord(null, "c") :: - NullableRecord(2, null) :: - NullableRecord(null, "b") :: - NullableRecord(3, null) :: - NullableRecord(5, "A") :: - NullableRecord(2, "B") :: Nil, 2 - ).toDF("a", "b") - df.createOrReplaceTempView("nullableData") - df - } - protected lazy val nullInts: DataFrame = { val df = spark.sparkContext.parallelize( NullInts(1) :: @@ -319,7 +305,6 @@ private[sql] object SQLTestData { case class IntField(i: Int) case class NullInts(a: Integer) case class NullStrings(n: Int, s: String) - case class NullableRecord(n: Integer, s: String) case class TableName(tableName: String) case class Person(id: Int, name: String, age: Int) case class Salary(personId: Int, salary: Double)