From 43d08c7d96f8aedf2e2f90377ff38734a59fee2d Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 5 Nov 2016 15:07:38 +0900 Subject: [PATCH 01/25] initial commit --- .../sql/catalyst/encoders/ExpressionEncoder.scala | 2 +- .../catalyst/encoders/ExpressionEncoderSuite.scala | 14 +++++++++++++- .../scala/org/apache/spark/sql/DatasetSuite.scala | 7 +++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 82e1a8a7cad96..9e15b9e6d930f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -50,7 +50,7 @@ object ExpressionEncoder { val cls = mirror.runtimeClass(tpe) val flat = !ScalaReflection.definedByConstructorParams(tpe) - val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = true) + val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = !cls.isPrimitive) val nullSafeInput = if (flat) { inputObject } else { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 4d896c2e38f10..f6f0d9a96e506 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -24,7 +24,7 @@ import java.util.Arrays import scala.collection.mutable.ArrayBuffer import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.sql.Encoders +import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.catalyst.{OptionalData, PrimitiveData} import org.apache.spark.sql.catalyst.analysis.AnalysisTest import org.apache.spark.sql.catalyst.dsl.plans._ @@ -338,6 +338,18 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest { } } + test("nullable of encoder serializer") { + def checkNullable[T: Encoder](nullable: Boolean*): Unit = { + assert(encoderFor[T].serializer.map(_.nullable) === nullable.toSeq) + } + + // test for flat encoders + checkNullable[Int](false) + checkNullable[Option[Int]](true) + checkNullable[java.lang.Integer](true) + checkNullable[String](true) + } + test("null check for map key") { val encoder = ExpressionEncoder[Map[String, Int]]() val e = intercept[RuntimeException](encoder.toRow(Map(("a", 1), (null, 2)))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 81fa8cbf22384..feb9a25f8060f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -969,6 +969,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext { assert(dataset.collect() sameElements Array(resultValue, resultValue)) } + test("SPARK-18284: Serializer should have correct nullable value") { + val df1 = sparkContext.parallelize(Seq(1, 2, 3, 4), 1).toDF() + assert(df1.schema(0).nullable == false) + val df2 = sparkContext.parallelize(Seq(Integer.valueOf(1), Integer.valueOf(2)), 1).toDF() + assert(df2.schema(0).nullable == true) + } + Seq(true, false).foreach { eager => def testCheckpointing(testName: String)(f: => Unit): Unit = { test(s"Dataset.checkpoint() - $testName (eager = $eager)") { From 0129d2d3d1a2f00ef1dcc4ce73de77b32e5160a8 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 6 Nov 2016 15:09:06 +0900 Subject: [PATCH 02/25] add test cases to confirm this PR works for primitive type array --- .../scala/org/apache/spark/sql/DatasetSuite.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index feb9a25f8060f..7a28b370751d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types._ class DatasetSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -970,10 +970,17 @@ class DatasetSuite extends QueryTest with SharedSQLContext { } test("SPARK-18284: Serializer should have correct nullable value") { - val df1 = sparkContext.parallelize(Seq(1, 2, 3, 4), 1).toDF() + val df1 = sparkContext.parallelize(Seq(1, 2, 3, 4), 1).toDF assert(df1.schema(0).nullable == false) - val df2 = sparkContext.parallelize(Seq(Integer.valueOf(1), Integer.valueOf(2)), 1).toDF() + val df2 = sparkContext.parallelize(Seq(Integer.valueOf(1), Integer.valueOf(2)), 1).toDF assert(df2.schema(0).nullable == true) + + val df3 = sparkContext.parallelize(Seq(Seq(1, 2), Seq(3, 4)), 1).toDF() + assert(df3.schema(0).nullable == true) + assert(df3.schema(0).dataType.asInstanceOf[ArrayType].containsNull == false) + val df4 = sparkContext.parallelize(Seq(Seq("a", "b"), Seq("c", "d")), 1).toDF() + assert(df4.schema(0).nullable == true) + assert(df4.schema(0).dataType.asInstanceOf[ArrayType].containsNull == true) } Seq(true, false).foreach { eager => From 05ac1e36514a283a3410d986db625f809b3990fc Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 10 Nov 2016 12:09:14 +0900 Subject: [PATCH 03/25] addressed review comment --- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 7a28b370751d8..b9506897b2d13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -970,15 +970,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { } test("SPARK-18284: Serializer should have correct nullable value") { - val df1 = sparkContext.parallelize(Seq(1, 2, 3, 4), 1).toDF + val df1 = Seq(1, 2, 3, 4).toDF assert(df1.schema(0).nullable == false) - val df2 = sparkContext.parallelize(Seq(Integer.valueOf(1), Integer.valueOf(2)), 1).toDF + val df2 = Seq(Integer.valueOf(1), Integer.valueOf(2)).toDF assert(df2.schema(0).nullable == true) - val df3 = sparkContext.parallelize(Seq(Seq(1, 2), Seq(3, 4)), 1).toDF() + val df3 = Seq(Seq(1, 2), Seq(3, 4)).toDF assert(df3.schema(0).nullable == true) assert(df3.schema(0).dataType.asInstanceOf[ArrayType].containsNull == false) - val df4 = sparkContext.parallelize(Seq(Seq("a", "b"), Seq("c", "d")), 1).toDF() + val df4 = Seq(Seq("a", "b"), Seq("c", "d")).toDF assert(df4.schema(0).nullable == true) assert(df4.schema(0).dataType.asInstanceOf[ArrayType].containsNull == true) } From d679b65ac65b1e37a1896708e3ff4d6906b001f6 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 10 Nov 2016 18:05:48 +0900 Subject: [PATCH 04/25] use schema, which is got from serializer, for ExpressionEncoder --- .../spark/sql/catalyst/encoders/ExpressionEncoder.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 9e15b9e6d930f..8d4a0d441f0e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -61,10 +61,7 @@ object ExpressionEncoder { val serializer = ScalaReflection.serializerFor[T](nullSafeInput) val deserializer = ScalaReflection.deserializerFor[T] - val schema = ScalaReflection.schemaFor[T] match { - case ScalaReflection.Schema(s: StructType, _) => s - case ScalaReflection.Schema(dt, nullable) => new StructType().add("value", dt, nullable) - } + val schema = serializer.dataType new ExpressionEncoder[T]( schema, From f877d8f5d3c0d26cee97a04d67940ba5d82ac328 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 15 Nov 2016 00:21:25 +0900 Subject: [PATCH 05/25] support primitive type in a struct --- .../spark/sql/catalyst/ScalaReflection.scala | 7 ++++++- .../scala/org/apache/spark/sql/DatasetSuite.scala | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 7bcaea7ea2f79..b67ceb167f9d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -590,7 +590,12 @@ object ScalaReflection extends ScalaReflection { "cannot be used as field name\n" + walkedTypePath.mkString("\n")) } - val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType)) + val fieldValue = if (!fieldType.typeSymbol.asClass.isPrimitive) { + Invoke(inputObject, fieldName, dataTypeFor(fieldType)) + } else { + AssertNotNull(Invoke(inputObject, fieldName, dataTypeFor(fieldType)), + Seq("primitive type should not have null")) + } val clsName = getClassNameFromType(fieldType) val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType, newPath) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index b9506897b2d13..c83f2e2763def 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +case class TestDataPoint(x: Int, y: Double, s: String) class DatasetSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -981,6 +982,19 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val df4 = Seq(Seq("a", "b"), Seq("c", "d")).toDF assert(df4.schema(0).nullable == true) assert(df4.schema(0).dataType.asInstanceOf[ArrayType].containsNull == true) + + val df5 = Seq((0, 1.0), (2, 2.0)).toDF("id", "v") + assert(df5.schema(0).nullable == false) + assert(df5.schema(1).nullable == false) + val df6 = Seq((0, 1.0, 3.0), (2, 2.0, 5.0)).toDF("id", "v1", "v2") + assert(df6.schema(0).nullable == false) + assert(df6.schema(1).nullable == false) + assert(df6.schema(2).nullable == false) + + val df7 = Seq(TestDataPoint(1, 2.2, "a"), TestDataPoint(3, 4.4, "null")).toDF + assert(df7.schema(0).nullable == false) + assert(df7.schema(1).nullable == false) + assert(df7.schema(2).nullable == true) } Seq(true, false).foreach { eager => From f35cef06fc0d2d54008c8a2caa906dcfd63a760c Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 15 Nov 2016 02:49:44 +0900 Subject: [PATCH 06/25] support MapType --- .../org/apache/spark/sql/types/MapType.scala | 7 ++++++- .../org/apache/spark/sql/DatasetSuite.scala | 16 ++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala index 3a32aa43d1c3a..0f8494ce22bc8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala @@ -94,5 +94,10 @@ object MapType extends AbstractDataType { * The `valueContainsNull` is true. */ def apply(keyType: DataType, valueType: DataType): MapType = - MapType(keyType: DataType, valueType: DataType, valueContainsNull = true) + MapType(keyType: DataType, valueType: DataType, valueContainsNull = + valueType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | + FloatType | DoubleType => false + case _ => true + }) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index c83f2e2763def..fd7458fb240be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -991,10 +991,18 @@ class DatasetSuite extends QueryTest with SharedSQLContext { assert(df6.schema(1).nullable == false) assert(df6.schema(2).nullable == false) - val df7 = Seq(TestDataPoint(1, 2.2, "a"), TestDataPoint(3, 4.4, "null")).toDF - assert(df7.schema(0).nullable == false) - assert(df7.schema(1).nullable == false) - assert(df7.schema(2).nullable == true) + val df7 = (Tuple1(Array(1, 2, 3)) :: Nil).toDF("a") + assert(df7.schema(0).nullable == true) + assert(df7.schema(0).dataType.asInstanceOf[ArrayType].containsNull == false) + + val df8 = (Tuple1(Map(2 -> 3)) :: Nil).toDF("m") + assert(df8.schema(0).nullable == true) + assert(df8.schema(0).dataType.asInstanceOf[MapType].valueContainsNull == false) + + val df9 = Seq(TestDataPoint(1, 2.2, "a"), TestDataPoint(3, 4.4, "null")).toDF + assert(df9.schema(0).nullable == false) + assert(df9.schema(1).nullable == false) + assert(df9.schema(2).nullable == true) } Seq(true, false).foreach { eager => From 7352b3bb621753bbc3bc88451c8d6f74d912435a Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 17 Nov 2016 23:57:01 +0900 Subject: [PATCH 07/25] fix test failures for handling Map with primitive types --- .../expressions/objects/objects.scala | 10 ++++++++-- .../org/apache/spark/sql/types/MapType.scala | 7 +------ .../org/apache/spark/sql/DatasetSuite.scala | 20 +++++++++++++------ 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 5c27179ec3b46..45617afaa52dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -643,8 +643,14 @@ case class ExternalMapToCatalyst private( override def foldable: Boolean = false - override def dataType: MapType = MapType( - keyConverter.dataType, valueConverter.dataType, valueContainsNull = valueConverter.nullable) + override def dataType: MapType = { + val isPrimitiveType = valueType match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | + FloatType | DoubleType => true + case _ => false + } + MapType(keyConverter.dataType, valueConverter.dataType, !isPrimitiveType) + } override def eval(input: InternalRow): Any = throw new UnsupportedOperationException("Only code-generated evaluation is supported") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala index 0f8494ce22bc8..3a32aa43d1c3a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala @@ -94,10 +94,5 @@ object MapType extends AbstractDataType { * The `valueContainsNull` is true. */ def apply(keyType: DataType, valueType: DataType): MapType = - MapType(keyType: DataType, valueType: DataType, valueContainsNull = - valueType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | - FloatType | DoubleType => false - case _ => true - }) + MapType(keyType: DataType, valueType: DataType, valueContainsNull = true) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index fd7458fb240be..1fd825b6970e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -995,14 +995,22 @@ class DatasetSuite extends QueryTest with SharedSQLContext { assert(df7.schema(0).nullable == true) assert(df7.schema(0).dataType.asInstanceOf[ArrayType].containsNull == false) - val df8 = (Tuple1(Map(2 -> 3)) :: Nil).toDF("m") + val df8 = (Tuple1(Array((null: Integer), (null: Integer))) :: Nil).toDF("a") assert(df8.schema(0).nullable == true) - assert(df8.schema(0).dataType.asInstanceOf[MapType].valueContainsNull == false) + assert(df8.schema(0).dataType.asInstanceOf[ArrayType].containsNull == true) - val df9 = Seq(TestDataPoint(1, 2.2, "a"), TestDataPoint(3, 4.4, "null")).toDF - assert(df9.schema(0).nullable == false) - assert(df9.schema(1).nullable == false) - assert(df9.schema(2).nullable == true) + val df9 = (Tuple1(Map(2 -> 3)) :: Nil).toDF("m") + assert(df9.schema(0).nullable == true) + assert(df9.schema(0).dataType.asInstanceOf[MapType].valueContainsNull == false) + + val df10 = (Tuple1(Map(1 -> (null: Integer))) :: Nil).toDF("m") + assert(df10.schema(0).nullable == true) + assert(df10.schema(0).dataType.asInstanceOf[MapType].valueContainsNull == true) + + val df11 = Seq(TestDataPoint(1, 2.2, "a"), TestDataPoint(3, 4.4, "null")).toDF + assert(df11.schema(0).nullable == false) + assert(df11.schema(1).nullable == false) + assert(df11.schema(2).nullable == true) } Seq(true, false).foreach { eager => From bd25130b6f6a4e24239d25cb97e31c5c683f9df2 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 18 Nov 2016 03:05:43 +0900 Subject: [PATCH 08/25] fix test faiulre --- .../org/apache/spark/sql/streaming/FileStreamSinkSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 09613ef9e4348..54efae3fb4627 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -86,7 +86,7 @@ class FileStreamSinkSuite extends StreamTest { val outputDf = spark.read.parquet(outputDir) val expectedSchema = new StructType() - .add(StructField("value", IntegerType)) + .add(StructField("value", IntegerType, nullable = false)) .add(StructField("id", IntegerType)) assert(outputDf.schema === expectedSchema) From 300647bc5ccbbf50bdd38d6ca3e070daa853d801 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 18 Nov 2016 15:15:46 +0900 Subject: [PATCH 09/25] add returnNullable argument to Invoke() --- .../org/apache/spark/sql/catalyst/ScalaReflection.scala | 8 ++------ .../spark/sql/catalyst/expressions/objects/objects.scala | 5 +++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index b67ceb167f9d5..566df198ea79f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -590,12 +590,8 @@ object ScalaReflection extends ScalaReflection { "cannot be used as field name\n" + walkedTypePath.mkString("\n")) } - val fieldValue = if (!fieldType.typeSymbol.asClass.isPrimitive) { - Invoke(inputObject, fieldName, dataTypeFor(fieldType)) - } else { - AssertNotNull(Invoke(inputObject, fieldName, dataTypeFor(fieldType)), - Seq("primitive type should not have null")) - } + val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType), + returnNullable = inputObject.nullable || !fieldType.typeSymbol.asClass.isPrimitive) val clsName = getClassNameFromType(fieldType) val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType, newPath) :: Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 45617afaa52dd..472e3658d7bdc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -177,9 +177,10 @@ case class Invoke( functionName: String, dataType: DataType, arguments: Seq[Expression] = Nil, - propagateNull: Boolean = true) extends InvokeLike { + propagateNull: Boolean = true, + returnNullable : Boolean = true) extends Expression with NonSQLExpression { - override def nullable: Boolean = true + override def nullable: Boolean = returnNullable override def children: Seq[Expression] = targetObject +: arguments override def eval(input: InternalRow): Any = From d45775e8472d6fb03899af706bb2c4739a6fa1ac Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 18 Nov 2016 17:11:10 +0900 Subject: [PATCH 10/25] addressed review comment --- .../scala/org/apache/spark/sql/catalyst/ScalaReflection.scala | 2 +- .../apache/spark/sql/catalyst/expressions/objects/objects.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 566df198ea79f..dfc70f2f0a310 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -591,7 +591,7 @@ object ScalaReflection extends ScalaReflection { } val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType), - returnNullable = inputObject.nullable || !fieldType.typeSymbol.asClass.isPrimitive) + returnNullable = !fieldType.typeSymbol.asClass.isPrimitive) val clsName = getClassNameFromType(fieldType) val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType, newPath) :: Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 472e3658d7bdc..ed859b252f68e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -180,7 +180,7 @@ case class Invoke( propagateNull: Boolean = true, returnNullable : Boolean = true) extends Expression with NonSQLExpression { - override def nullable: Boolean = returnNullable + override def nullable: Boolean = targetObject.nullable || returnNullable override def children: Seq[Expression] = targetObject +: arguments override def eval(input: InternalRow): Any = From b3dac38e7bd3785fd814b9596bf25c8650ea6e37 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 22 Nov 2016 12:57:21 +0900 Subject: [PATCH 11/25] address comments of @cloud-fan, @ueshin, and @viirya to add a nullable flag to LambdaVariable --- .../apache/spark/sql/catalyst/JavaTypeInference.scala | 5 ++++- .../apache/spark/sql/catalyst/ScalaReflection.scala | 3 ++- .../sql/catalyst/expressions/objects/objects.scala | 10 ++++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 04f0cfce883f2..7464aad35f45c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -396,12 +396,15 @@ object JavaTypeInference { case _ if mapType.isAssignableFrom(typeToken) => val (keyType, valueType) = mapKeyValueType(typeToken) + val (_, valueNullable) = inferDataType(valueType) + ExternalMapToCatalyst( inputObject, ObjectType(keyType.getRawType), serializerFor(_, keyType), ObjectType(valueType.getRawType), - serializerFor(_, valueType) + serializerFor(_, valueType), + valueNullable ) case other => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index dfc70f2f0a310..bd9db5bc1d184 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -498,7 +498,8 @@ object ScalaReflection extends ScalaReflection { dataTypeFor(keyType), serializerFor(_, keyType, keyPath), dataTypeFor(valueType), - serializerFor(_, valueType, valuePath)) + serializerFor(_, valueType, valuePath), + !valueType.typeSymbol.asClass.isPrimitive) case t if t <:< localTypeOf[String] => StaticInvoke( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index ed859b252f68e..19bfc88242910 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -406,10 +406,11 @@ case class WrapOption(child: Expression, optType: DataType) * A place holder for the loop variable used in [[MapObjects]]. This should never be constructed * manually, but will instead be passed into the provided lambda function. */ -case class LambdaVariable(value: String, isNull: String, dataType: DataType) extends LeafExpression +case class LambdaVariable(value: String, isNull: String, dataType: DataType, + valueNullable: Boolean = true) extends LeafExpression with Unevaluable with NonSQLExpression { - override def nullable: Boolean = true + override def nullable: Boolean = valueNullable override def genCode(ctx: CodegenContext): ExprCode = { ExprCode(code = "", value = value, isNull = isNull) @@ -593,7 +594,8 @@ object ExternalMapToCatalyst { keyType: DataType, keyConverter: Expression => Expression, valueType: DataType, - valueConverter: Expression => Expression): ExternalMapToCatalyst = { + valueConverter: Expression => Expression, + valueNullable: Boolean): ExternalMapToCatalyst = { val id = curId.getAndIncrement() val keyName = "ExternalMapToCatalyst_key" + id val valueName = "ExternalMapToCatalyst_value" + id @@ -606,7 +608,7 @@ object ExternalMapToCatalyst { valueName, valueIsNull, valueType, - valueConverter(LambdaVariable(valueName, valueIsNull, valueType)), + valueConverter(LambdaVariable(valueName, valueIsNull, valueType, valueNullable)), inputMap ) } From 3a1a8b54089741fa764124eac0285790991d6163 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 22 Nov 2016 17:01:19 +0900 Subject: [PATCH 12/25] merge with master --- .../apache/spark/sql/catalyst/expressions/objects/objects.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 19bfc88242910..09d5c0ed9de77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -178,7 +178,7 @@ case class Invoke( dataType: DataType, arguments: Seq[Expression] = Nil, propagateNull: Boolean = true, - returnNullable : Boolean = true) extends Expression with NonSQLExpression { + returnNullable : Boolean = true) extends InvokeLike { override def nullable: Boolean = targetObject.nullable || returnNullable override def children: Seq[Expression] = targetObject +: arguments From d5f6e6ac23b642ed6caf443bf9c5e4336da5175b Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 26 Nov 2016 02:53:34 +0900 Subject: [PATCH 13/25] fix test failures of nested struct --- .../org/apache/spark/sql/catalyst/ScalaReflection.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index bd9db5bc1d184..f8b2997cf8064 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -591,8 +591,11 @@ object ScalaReflection extends ScalaReflection { "cannot be used as field name\n" + walkedTypePath.mkString("\n")) } + // primitive take only non-null or struct takes non-null object guarded by isNull + val returnNonNull = fieldType.typeSymbol.asClass.isPrimitive || + definedByConstructorParams(fieldType) val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType), - returnNullable = !fieldType.typeSymbol.asClass.isPrimitive) + returnNullable = !returnNonNull) val clsName = getClassNameFromType(fieldType) val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType, newPath) :: Nil From 725db33eed86ab74ebbdfcca84160d34275efd22 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 26 Nov 2016 02:54:06 +0900 Subject: [PATCH 14/25] add test suite for nested struct --- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 1fd825b6970e9..2b194f9305dd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -30,7 +30,9 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -case class TestDataPoint(x: Int, y: Double, s: String) +case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2) +case class TestDataPoint2(x: Int, s: String) + class DatasetSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -1007,10 +1009,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext { assert(df10.schema(0).nullable == true) assert(df10.schema(0).dataType.asInstanceOf[MapType].valueContainsNull == true) - val df11 = Seq(TestDataPoint(1, 2.2, "a"), TestDataPoint(3, 4.4, "null")).toDF + val df11 = Seq(TestDataPoint(1, 2.2, "a", null), + TestDataPoint(3, 4.4, "null", (TestDataPoint2(33, "b")))).toDF assert(df11.schema(0).nullable == false) assert(df11.schema(1).nullable == false) assert(df11.schema(2).nullable == true) + assert(df11.schema(3).nullable == true) + assert(df11.schema(3).dataType.asInstanceOf[StructType].fields(0).nullable == false) + assert(df11.schema(3).dataType.asInstanceOf[StructType].fields(1).nullable == true) } Seq(true, false).foreach { eager => From d69a234e436ce8f3f9ebdd37ef7d8b9f19040c33 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 28 Nov 2016 03:02:39 +0900 Subject: [PATCH 15/25] addressed review comments --- .../sql/catalyst/expressions/objects/objects.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 09d5c0ed9de77..9cf00000deba1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -413,7 +413,7 @@ case class LambdaVariable(value: String, isNull: String, dataType: DataType, override def nullable: Boolean = valueNullable override def genCode(ctx: CodegenContext): ExprCode = { - ExprCode(code = "", value = value, isNull = isNull) + ExprCode(code = "", value = value, isNull = if (nullable) isNull else "false") } } @@ -604,7 +604,7 @@ object ExternalMapToCatalyst { ExternalMapToCatalyst( keyName, keyType, - keyConverter(LambdaVariable(keyName, "false", keyType)), + keyConverter(LambdaVariable(keyName, "false", keyType, false)), valueName, valueIsNull, valueType, @@ -647,12 +647,7 @@ case class ExternalMapToCatalyst private( override def foldable: Boolean = false override def dataType: MapType = { - val isPrimitiveType = valueType match { - case BooleanType | ByteType | ShortType | IntegerType | LongType | - FloatType | DoubleType => true - case _ => false - } - MapType(keyConverter.dataType, valueConverter.dataType, !isPrimitiveType) + MapType(keyConverter.dataType, valueConverter.dataType, valueConverter.nullable) } override def eval(input: InternalRow): Any = From 214c6bb2d7aaf773d01a846795eb78f1e07e4ed1 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 28 Nov 2016 14:56:58 +0900 Subject: [PATCH 16/25] addressed review comments --- .../catalyst/expressions/ReferenceToExpressions.scala | 3 ++- .../spark/sql/catalyst/expressions/objects/objects.scala | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala index 6c75a7a50214f..efc66fdd8dd6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala @@ -74,7 +74,8 @@ case class ReferenceToExpressions(result: Expression, children: Seq[Expression]) ctx.addMutableState("boolean", classChildVarIsNull, "") val classChildVar = - LambdaVariable(classChildVarName, classChildVarIsNull, child.dataType) + LambdaVariable(classChildVarName, classChildVarIsNull, child.dataType, + childGen.isNull != "false") val initCode = s"${classChildVar.value} = ${childGen.value};\n" + s"${classChildVar.isNull} = ${childGen.isNull};" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 9cf00000deba1..a4aed677bfd0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -171,6 +171,7 @@ case class StaticInvoke( * @param arguments An optional list of expressions, whos evaluation will be passed to the function. * @param propagateNull When true, and any of the arguments is null, null will be returned instead * of calling the function. + * @param returnNullable When false, return value must be non-null. */ case class Invoke( targetObject: Expression, @@ -180,7 +181,7 @@ case class Invoke( propagateNull: Boolean = true, returnNullable : Boolean = true) extends InvokeLike { - override def nullable: Boolean = targetObject.nullable || returnNullable + override def nullable: Boolean = targetObject.nullable || needNullCheck || returnNullable override def children: Seq[Expression] = targetObject +: arguments override def eval(input: InternalRow): Any = @@ -406,8 +407,8 @@ case class WrapOption(child: Expression, optType: DataType) * A place holder for the loop variable used in [[MapObjects]]. This should never be constructed * manually, but will instead be passed into the provided lambda function. */ -case class LambdaVariable(value: String, isNull: String, dataType: DataType, - valueNullable: Boolean = true) extends LeafExpression +case class LambdaVariable(value: String, isNull: String, dataType: DataType, valueNullable: Boolean) + extends LeafExpression with Unevaluable with NonSQLExpression { override def nullable: Boolean = valueNullable @@ -433,7 +434,7 @@ object MapObjects { elementType: DataType): MapObjects = { val loopValue = "MapObjects_loopValue" + curId.getAndIncrement() val loopIsNull = "MapObjects_loopIsNull" + curId.getAndIncrement() - val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) + val loopVar = LambdaVariable(loopValue, loopIsNull, elementType, true) MapObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData) } } From 2a1287a84cb303a8df9f8c310aad154e04b6b4d4 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 28 Nov 2016 17:45:15 +0900 Subject: [PATCH 17/25] addressed review comment --- .../spark/sql/catalyst/expressions/objects/objects.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index a4aed677bfd0a..966fcf2c367de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -407,12 +407,10 @@ case class WrapOption(child: Expression, optType: DataType) * A place holder for the loop variable used in [[MapObjects]]. This should never be constructed * manually, but will instead be passed into the provided lambda function. */ -case class LambdaVariable(value: String, isNull: String, dataType: DataType, valueNullable: Boolean) - extends LeafExpression +case class LambdaVariable(value: String, isNull: String, dataType: DataType, + nullable: Boolean = true) extends LeafExpression with Unevaluable with NonSQLExpression { - override def nullable: Boolean = valueNullable - override def genCode(ctx: CodegenContext): ExprCode = { ExprCode(code = "", value = value, isNull = if (nullable) isNull else "false") } @@ -434,7 +432,7 @@ object MapObjects { elementType: DataType): MapObjects = { val loopValue = "MapObjects_loopValue" + curId.getAndIncrement() val loopIsNull = "MapObjects_loopIsNull" + curId.getAndIncrement() - val loopVar = LambdaVariable(loopValue, loopIsNull, elementType, true) + val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) MapObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData) } } From 3ca766986bc8bcc1aeb57e4815040676b08f9f15 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 28 Nov 2016 18:02:19 +0900 Subject: [PATCH 18/25] addressed review comments --- .../org/apache/spark/sql/catalyst/JavaTypeInference.scala | 3 +-- .../spark/sql/catalyst/expressions/objects/objects.scala | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 7464aad35f45c..6763e9d69f2f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -396,7 +396,6 @@ object JavaTypeInference { case _ if mapType.isAssignableFrom(typeToken) => val (keyType, valueType) = mapKeyValueType(typeToken) - val (_, valueNullable) = inferDataType(valueType) ExternalMapToCatalyst( inputObject, @@ -404,7 +403,7 @@ object JavaTypeInference { serializerFor(_, keyType), ObjectType(valueType.getRawType), serializerFor(_, valueType), - valueNullable + true ) case other => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 966fcf2c367de..012118dfa3d45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -645,9 +645,8 @@ case class ExternalMapToCatalyst private( override def foldable: Boolean = false - override def dataType: MapType = { - MapType(keyConverter.dataType, valueConverter.dataType, valueConverter.nullable) - } + override def dataType: MapType = MapType( + keyConverter.dataType, valueConverter.dataType, valueContainsNull = valueConverter.nullable) override def eval(input: InternalRow): Any = throw new UnsupportedOperationException("Only code-generated evaluation is supported") From b7bf966a808668c08787c39632fc4634c9a8d3da Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 28 Nov 2016 18:04:54 +0900 Subject: [PATCH 19/25] addressed review comment --- .../sql/catalyst/expressions/ReferenceToExpressions.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala index efc66fdd8dd6b..2ca77e8394e17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala @@ -74,8 +74,7 @@ case class ReferenceToExpressions(result: Expression, children: Seq[Expression]) ctx.addMutableState("boolean", classChildVarIsNull, "") val classChildVar = - LambdaVariable(classChildVarName, classChildVarIsNull, child.dataType, - childGen.isNull != "false") + LambdaVariable(classChildVarName, classChildVarIsNull, child.dataType, child.nullable) val initCode = s"${classChildVar.value} = ${childGen.value};\n" + s"${classChildVar.isNull} = ${childGen.isNull};" From d409d4637ab8e2bc241f1968e46903b0c2cb77d3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 29 Nov 2016 23:39:04 +0900 Subject: [PATCH 20/25] addressed review comment --- .../org/apache/spark/sql/catalyst/ScalaReflection.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index f8b2997cf8064..38703daaa7ef6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -591,11 +591,9 @@ object ScalaReflection extends ScalaReflection { "cannot be used as field name\n" + walkedTypePath.mkString("\n")) } - // primitive take only non-null or struct takes non-null object guarded by isNull - val returnNonNull = fieldType.typeSymbol.asClass.isPrimitive || - definedByConstructorParams(fieldType) - val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType), - returnNullable = !returnNonNull) + val fieldValue = Invoke( + AssertNotNull(inputObject, walkedTypePath), fieldName, dataTypeFor(fieldType), + returnNullable = !fieldType.typeSymbol.asClass.isPrimitive) val clsName = getClassNameFromType(fieldType) val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType, newPath) :: Nil From 39e4930cc7dc1ebf36b8d517a030072ebf0a7df3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 30 Nov 2016 04:32:04 +0900 Subject: [PATCH 21/25] revert the previous change rephrase a comment --- .../org/apache/spark/sql/catalyst/ScalaReflection.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 38703daaa7ef6..f134b7dfe0926 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -591,9 +591,12 @@ object ScalaReflection extends ScalaReflection { "cannot be used as field name\n" + walkedTypePath.mkString("\n")) } - val fieldValue = Invoke( - AssertNotNull(inputObject, walkedTypePath), fieldName, dataTypeFor(fieldType), - returnNullable = !fieldType.typeSymbol.asClass.isPrimitive) + // we know primitive type takes only non-null, or + // we can infer correct nullability for struct's fieldValue by a guard using If(IsNull()) + val returnNonNull = fieldType.typeSymbol.asClass.isPrimitive || + definedByConstructorParams(fieldType) + val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType), + returnNullable = !returnNonNull) val clsName = getClassNameFromType(fieldType) val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType, newPath) :: Nil From 93d43548c2c71dba5b682d8d739f6d7c5889a8ba Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 1 Dec 2016 04:34:03 +0900 Subject: [PATCH 22/25] addressed review comments --- .../org/apache/spark/sql/catalyst/ScalaReflection.scala | 3 ++- .../spark/sql/catalyst/expressions/objects/objects.scala | 7 +++++-- .../sql/catalyst/encoders/ExpressionEncoderSuite.scala | 4 ++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index f134b7dfe0926..c4e6379c92376 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -592,7 +592,8 @@ object ScalaReflection extends ScalaReflection { } // we know primitive type takes only non-null, or - // we can infer correct nullability for struct's fieldValue by a guard using If(IsNull()) + // we can infer correct nullability for struct's fieldValue by a guard + // using If(IsNull()) at the last line of this section val returnNonNull = fieldType.typeSymbol.asClass.isPrimitive || definedByConstructorParams(fieldType) val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 012118dfa3d45..2dadd064c4cc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -171,7 +171,7 @@ case class StaticInvoke( * @param arguments An optional list of expressions, whos evaluation will be passed to the function. * @param propagateNull When true, and any of the arguments is null, null will be returned instead * of calling the function. - * @param returnNullable When false, return value must be non-null. + * @param returnNullable When false, indicating the invoked method will return non-null value. */ case class Invoke( targetObject: Expression, @@ -407,7 +407,10 @@ case class WrapOption(child: Expression, optType: DataType) * A place holder for the loop variable used in [[MapObjects]]. This should never be constructed * manually, but will instead be passed into the provided lambda function. */ -case class LambdaVariable(value: String, isNull: String, dataType: DataType, +case class LambdaVariable( + value: String, + isNull: String, + dataType: DataType, nullable: Boolean = true) extends LeafExpression with Unevaluable with NonSQLExpression { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index f6f0d9a96e506..a8df920ba71fd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -339,8 +339,8 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest { } test("nullable of encoder serializer") { - def checkNullable[T: Encoder](nullable: Boolean*): Unit = { - assert(encoderFor[T].serializer.map(_.nullable) === nullable.toSeq) + def checkNullable[T: Encoder](nullable: Boolean): Unit = { + assert(encoderFor[T].serializer.map(_.nullable) === nullable) } // test for flat encoders From 737b21f5ed2370ecddab08b97b58987917a530d2 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 1 Dec 2016 18:30:26 +0900 Subject: [PATCH 23/25] addressed review comments --- .../apache/spark/sql/catalyst/ScalaReflection.scala | 10 +++------- .../sql/catalyst/encoders/ExpressionEncoderSuite.scala | 7 ++++++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index c4e6379c92376..38703daaa7ef6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -591,13 +591,9 @@ object ScalaReflection extends ScalaReflection { "cannot be used as field name\n" + walkedTypePath.mkString("\n")) } - // we know primitive type takes only non-null, or - // we can infer correct nullability for struct's fieldValue by a guard - // using If(IsNull()) at the last line of this section - val returnNonNull = fieldType.typeSymbol.asClass.isPrimitive || - definedByConstructorParams(fieldType) - val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType), - returnNullable = !returnNonNull) + val fieldValue = Invoke( + AssertNotNull(inputObject, walkedTypePath), fieldName, dataTypeFor(fieldType), + returnNullable = !fieldType.typeSymbol.asClass.isPrimitive) val clsName = getClassNameFromType(fieldType) val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType, newPath) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index a8df920ba71fd..080f11b769388 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -300,6 +300,11 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest { encodeDecodeTest( ReferenceValueClass(ReferenceValueClass.Container(1)), "reference value class") + encodeDecodeTest(Option(31), "option of int") + encodeDecodeTest(Option.empty[Int], "empty option of int") + encodeDecodeTest(Option("abc"), "option of string") + encodeDecodeTest(Option.empty[String], "empty option of string") + productTest(("UDT", new ExamplePoint(0.1, 0.2))) test("nullable of encoder schema") { @@ -340,7 +345,7 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest { test("nullable of encoder serializer") { def checkNullable[T: Encoder](nullable: Boolean): Unit = { - assert(encoderFor[T].serializer.map(_.nullable) === nullable) + assert(encoderFor[T].serializer.forall(_.nullable === nullable)) } // test for flat encoders From c9256660cb934ad4bf668c25cd078be7fa8a310b Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 1 Dec 2016 22:12:39 +0900 Subject: [PATCH 24/25] addressed review comments --- .../spark/sql/catalyst/expressions/objects/objects.scala | 3 ++- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 2dadd064c4cc1..5437fe41d021d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -171,7 +171,8 @@ case class StaticInvoke( * @param arguments An optional list of expressions, whos evaluation will be passed to the function. * @param propagateNull When true, and any of the arguments is null, null will be returned instead * of calling the function. - * @param returnNullable When false, indicating the invoked method will return non-null value. + * @param returnNullable When false, indicating the invoked method will always return + * non-null value. */ case class Invoke( targetObject: Expression, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 2b194f9305dd7..c66a8a077f414 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -988,10 +988,10 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val df5 = Seq((0, 1.0), (2, 2.0)).toDF("id", "v") assert(df5.schema(0).nullable == false) assert(df5.schema(1).nullable == false) - val df6 = Seq((0, 1.0, 3.0), (2, 2.0, 5.0)).toDF("id", "v1", "v2") + val df6 = Seq((0, 1.0, "a"), (2, 2.0, "b")).toDF("id", "v1", "v2") assert(df6.schema(0).nullable == false) assert(df6.schema(1).nullable == false) - assert(df6.schema(2).nullable == false) + assert(df6.schema(2).nullable == true) val df7 = (Tuple1(Array(1, 2, 3)) :: Nil).toDF("a") assert(df7.schema(0).nullable == true) From 296b5af6cd51fc3b1d94f1909ee2c74ba405423c Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 1 Dec 2016 23:12:17 +0900 Subject: [PATCH 25/25] addressed review comments --- .../scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala | 2 +- .../scala/org/apache/spark/sql/catalyst/ScalaReflection.scala | 2 +- .../apache/spark/sql/catalyst/expressions/objects/objects.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 6763e9d69f2f9..7e8e4dab72145 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -403,7 +403,7 @@ object JavaTypeInference { serializerFor(_, keyType), ObjectType(valueType.getRawType), serializerFor(_, valueType), - true + valueNullable = true ) case other => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 38703daaa7ef6..7da7b3ada0db6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -499,7 +499,7 @@ object ScalaReflection extends ScalaReflection { serializerFor(_, keyType, keyPath), dataTypeFor(valueType), serializerFor(_, valueType, valuePath), - !valueType.typeSymbol.asClass.isPrimitive) + valueNullable = !valueType.typeSymbol.asClass.isPrimitive) case t if t <:< localTypeOf[String] => StaticInvoke( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 5437fe41d021d..7482803a9adef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -172,7 +172,7 @@ case class StaticInvoke( * @param propagateNull When true, and any of the arguments is null, null will be returned instead * of calling the function. * @param returnNullable When false, indicating the invoked method will always return - * non-null value. + * non-null value. */ case class Invoke( targetObject: Expression,