From 5122e65f2a7a6d83b4f8b338f99d1ac45e3254bf Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 7 Jul 2015 09:21:05 -0700 Subject: [PATCH 1/3] If types of all columns are NullTypes, do not use serializer2. --- .../sql/execution/SparkSqlSerializer2.scala | 10 ++++++++-- .../execution/SparkSqlSerializer2Suite.scala | 20 ++++++++++++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index 056d435eecd23..29c15b4155a91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -179,10 +179,13 @@ private[sql] object SparkSqlSerializer2 { /** * Check if rows with the given schema can be serialized with ShuffleSerializer. + * Right now, we do not support schemata with complex types, UDTs, or all data types + * of fields are NullTypes. */ def support(schema: Array[DataType]): Boolean = { if (schema == null) return true + var allNulTypes = true var i = 0 while (i < schema.length) { schema(i) match { @@ -190,12 +193,15 @@ private[sql] object SparkSqlSerializer2 { case array: ArrayType => return false case map: MapType => return false case struct: StructType => return false - case _ => + case NullType => // Do nothing + case _ => if (allNulTypes) allNulTypes = false // This is not a NullType. } i += 1 } - return true + // If types of fields are all NullTypes, we return false. + // Otherwise, we return true. + return !allNulTypes } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala index 8631e247c6c05..71f6b26bcd01a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala @@ -42,7 +42,6 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite { } checkSupported(null, isSupported = true) - checkSupported(NullType, isSupported = true) checkSupported(BooleanType, isSupported = true) checkSupported(ByteType, isSupported = true) checkSupported(ShortType, isSupported = true) @@ -57,6 +56,8 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite { checkSupported(DecimalType(10, 5), isSupported = true) checkSupported(DecimalType.Unlimited, isSupported = true) + // If NullType is the only data type in the schema, we do not support it. + checkSupported(NullType, isSupported = false) // For now, ArrayType, MapType, and StructType are not supported. checkSupported(ArrayType(DoubleType, true), isSupported = false) checkSupported(ArrayType(StringType, false), isSupported = false) @@ -170,6 +171,23 @@ abstract class SparkSqlSerializer2Suite extends QueryTest with BeforeAndAfterAll val df = ctx.sql(s"SELECT 1 + 1 FROM shuffle") checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer]) } + + test("types of fields are all NullTypes") { + // Test range partitioning code path. + val nulls = ctx.sql(s"SELECT null as a, null as b, null as c") + val df = nulls.unionAll(nulls).sort("a") + checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer]) + checkAnswer( + df, + Row(null, null, null) :: Row(null, null, null) :: Nil) + + // Test hash partitioning code path. + val oneRow = ctx.sql(s"SELECT DISTINCT null, null, null FROM shuffle") + checkSerializer(oneRow.queryExecution.executedPlan, classOf[SparkSqlSerializer]) + checkAnswer( + oneRow, + Row(null, null, null)) + } } /** Tests SparkSqlSerializer2 with sort based shuffle without sort merge. */ From e4568574b5eeec475b1ddbac6025619c94061c5b Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 7 Jul 2015 10:10:40 -0700 Subject: [PATCH 2/3] Josh's comments. --- .../sql/execution/SparkSqlSerializer2.scala | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index 29c15b4155a91..93ae52b47567d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -179,29 +179,38 @@ private[sql] object SparkSqlSerializer2 { /** * Check if rows with the given schema can be serialized with ShuffleSerializer. - * Right now, we do not support schemata with complex types, UDTs, or all data types + * Right now, we do not support a schema having complex types or UDTs, or all data types * of fields are NullTypes. */ def support(schema: Array[DataType]): Boolean = { if (schema == null) return true - var allNulTypes = true + var allNullTypes = true var i = 0 while (i < schema.length) { schema(i) match { - case udt: UserDefinedType[_] => return false - case array: ArrayType => return false - case map: MapType => return false - case struct: StructType => return false case NullType => // Do nothing - case _ => if (allNulTypes) allNulTypes = false // This is not a NullType. + case udt: UserDefinedType[_] => + if (allNullTypes) allNullTypes = false + return false + case array: ArrayType => + if (allNullTypes) allNullTypes = false + return false + case map: MapType => + if (allNullTypes) allNullTypes = false + return false + case struct: StructType => + if (allNullTypes) allNullTypes = false + return false + case _ => + if (allNullTypes) allNullTypes = false } i += 1 } // If types of fields are all NullTypes, we return false. // Otherwise, we return true. - return !allNulTypes + return !allNullTypes } /** From cb58780be2838052475ffb8908d7e378b9b30be9 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 7 Jul 2015 13:11:33 -0700 Subject: [PATCH 3/3] Andrew's comment. --- .../spark/sql/execution/SparkSqlSerializer2.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index 93ae52b47567d..6ed822dc70d68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -191,19 +191,19 @@ private[sql] object SparkSqlSerializer2 { schema(i) match { case NullType => // Do nothing case udt: UserDefinedType[_] => - if (allNullTypes) allNullTypes = false + allNullTypes = false return false case array: ArrayType => - if (allNullTypes) allNullTypes = false + allNullTypes = false return false case map: MapType => - if (allNullTypes) allNullTypes = false + allNullTypes = false return false case struct: StructType => - if (allNullTypes) allNullTypes = false + allNullTypes = false return false case _ => - if (allNullTypes) allNullTypes = false + allNullTypes = false } i += 1 }