From ec6aa5095138cd6cd22109096463182f326e15f9 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 15 Mar 2017 19:09:12 +0900 Subject: [PATCH 1/9] initial commit --- .../sql/catalyst/expressions/BoundAttribute.scala | 6 +++++- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 11 +++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) mode change 100644 => 100755 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala mode change 100644 => 100755 sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala old mode 100644 new mode 100755 index 7d16118c9d59f..ab9cbadea9039 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -63,7 +63,11 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean) val value = ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString) if (ctx.currentVars != null && ctx.currentVars(ordinal) != null) { val oev = ctx.currentVars(ordinal) - ev.isNull = oev.isNull + ev.isNull = if (!nullable || ctx.INPUT_ROW == null) { + oev.isNull + } else { + s"((${oev.isNull}) || ${ctx.INPUT_ROW}.isNullAt($ordinal))" + } ev.value = oev.value val code = oev.code oev.code = "" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala old mode 100644 new mode 100755 index 52bd4e19f8952..81f3d5890d1d5 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1722,4 +1722,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { "Cannot have map type columns in DataFrame which calls set operations")) } } + + test("SPARK-19959: df[java.lang.Long].collect includes null throws NullPointerException") { + val dfInt = sparkContext.parallelize(Seq[java.lang.Integer](0, null, 2), 1).toDF + assert(dfInt.collect === Array(Row(0), Row(null), Row(2))) + val dfLong = sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF + assert(dfLong.collect === Array(Row(0L), Row(null), Row(2L))) + val dfFloat = sparkContext.parallelize(Seq[java.lang.Float](0.0F, null, 2.0F), 1).toDF + assert(dfFloat.collect === Array(Row(0.0F), Row(null), Row(2.0F))) + val dfDouble = sparkContext.parallelize(Seq[java.lang.Double](0.0D, null, 2.0D), 1).toDF + assert(dfDouble.collect === Array(Row(0.0D), Row(null), Row(2.0D))) + } } From 2deeba8ce246ffad26097f74525c1f0fbe4237ef Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 16 Mar 2017 01:38:45 +0900 Subject: [PATCH 2/9] fix test failures --- .../spark/sql/catalyst/expressions/BoundAttribute.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) mode change 100755 => 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala old mode 100755 new mode 100644 index ab9cbadea9039..91de396f03f27 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -63,10 +63,11 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean) val value = ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString) if (ctx.currentVars != null && ctx.currentVars(ordinal) != null) { val oev = ctx.currentVars(ordinal) - ev.isNull = if (!nullable || ctx.INPUT_ROW == null) { + ev.isNull = if (!nullable || oev.isNull != "false" || ctx.INPUT_ROW == null) { oev.isNull } else { - s"((${oev.isNull}) || ${ctx.INPUT_ROW}.isNullAt($ordinal))" + // generate nullcheck if inputvalue is non-null and its expression is nullable + s"${ctx.INPUT_ROW}.isNullAt($ordinal)" } ev.value = oev.value val code = oev.code From fd4fc3d2058a1610f1429c89fe7c1146c42c7328 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 20 Mar 2017 02:58:16 +0900 Subject: [PATCH 3/9] ExternalRDD.apply() set true into nullable if T in RDD[T] is non-primitive type --- .../catalyst/expressions/BoundAttribute.scala | 7 +------ .../spark/sql/execution/ExistingRDD.scala | 17 +++++++++++++++-- .../spark/sql/DataFrameImplicitsSuite.scala | 11 +++++++++++ .../org/apache/spark/sql/DataFrameSuite.scala | 11 ----------- 4 files changed, 27 insertions(+), 19 deletions(-) mode change 100755 => 100644 sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 91de396f03f27..7d16118c9d59f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -63,12 +63,7 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean) val value = ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString) if (ctx.currentVars != null && ctx.currentVars(ordinal) != null) { val oev = ctx.currentVars(ordinal) - ev.isNull = if (!nullable || oev.isNull != "false" || ctx.INPUT_ROW == null) { - oev.isNull - } else { - // generate nullcheck if inputvalue is non-null and its expression is nullable - s"${ctx.INPUT_ROW}.isNullAt($ordinal)" - } + ev.isNull = oev.isNull ev.value = oev.value val code = oev.code oev.code = "" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 49336f424822f..6f978dd7887e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils object RDDConversions { @@ -70,7 +70,20 @@ object RDDConversions { object ExternalRDD { def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = { - val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session) + val attr = { + val attr = CatalystSerde.generateObjAttr[T] + // Since ExpressionEncoder[T].deserializer is not resolved here, + // cannot access ExpressionEncoder[T].deserializer.nullable. + // We infer nullability from DataType + attr.dataType match { + case BooleanType => attr + case _: IntegralType => attr + case FloatType | DoubleType => attr + case DecimalType.Fixed(p, s) if p <= Decimal.MAX_LONG_DIGITS => attr + case _ => attr.withNullability(true) + } + } + val externalRdd = ExternalRDD(attr, rdd)(session) CatalystSerde.serialize[T](externalRdd) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala index 094efbaeadcd5..dd4a2e2e338d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala @@ -51,4 +51,15 @@ class DataFrameImplicitsSuite extends QueryTest with SharedSQLContext { sparkContext.parallelize(1 to 10).map(_.toString).toDF("stringCol"), (1 to 10).map(i => Row(i.toString))) } + + test("SPARK-19959: df[java.lang.Long].collect includes null throws NullPointerException") { + val dfInt = sparkContext.parallelize(Seq[java.lang.Integer](0, null, 2), 1).toDF + assert(dfInt.collect === Array(Row(0), Row(null), Row(2))) + val dfLong = sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF + assert(dfLong.collect === Array(Row(0L), Row(null), Row(2L))) + val dfFloat = sparkContext.parallelize(Seq[java.lang.Float](0.0F, null, 2.0F), 1).toDF + assert(dfFloat.collect === Array(Row(0.0F), Row(null), Row(2.0F))) + val dfDouble = sparkContext.parallelize(Seq[java.lang.Double](0.0D, null, 2.0D), 1).toDF + assert(dfDouble.collect === Array(Row(0.0D), Row(null), Row(2.0D))) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala old mode 100755 new mode 100644 index 81f3d5890d1d5..52bd4e19f8952 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1722,15 +1722,4 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { "Cannot have map type columns in DataFrame which calls set operations")) } } - - test("SPARK-19959: df[java.lang.Long].collect includes null throws NullPointerException") { - val dfInt = sparkContext.parallelize(Seq[java.lang.Integer](0, null, 2), 1).toDF - assert(dfInt.collect === Array(Row(0), Row(null), Row(2))) - val dfLong = sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF - assert(dfLong.collect === Array(Row(0L), Row(null), Row(2L))) - val dfFloat = sparkContext.parallelize(Seq[java.lang.Float](0.0F, null, 2.0F), 1).toDF - assert(dfFloat.collect === Array(Row(0.0F), Row(null), Row(2.0F))) - val dfDouble = sparkContext.parallelize(Seq[java.lang.Double](0.0D, null, 2.0D), 1).toDF - assert(dfDouble.collect === Array(Row(0.0D), Row(null), Row(2.0D))) - } } From e81b0cb7415beb875921ea707bd4c78d8e38eecb Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 21 Mar 2017 19:12:22 +0900 Subject: [PATCH 4/9] move nullable inference logic to CatalystSerde.generateObjAttr --- .../sql/catalyst/plans/logical/object.scala | 15 ++++++++++++++- .../spark/sql/execution/ExistingRDD.scala | 17 +++-------------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 7f4462e583607..b4c07a0317d1c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -41,7 +41,20 @@ object CatalystSerde { } def generateObjAttr[T : Encoder]: Attribute = { - AttributeReference("obj", encoderFor[T].deserializer.dataType, nullable = false)() + val deserializer = encoderFor[T].deserializer + val dataType = deserializer.dataType + val nullable = if (deserializer.childrenResolved) { + deserializer.nullable + } else { + // Since deserializer is not resolved here, cannot access deserializer.nullable. + // We infer nullability from DataType + dataType match { + case BooleanType | FloatType | DoubleType => false + case _: IntegralType => false + case _ => true + } + } + AttributeReference("obj", dataType, nullable = nullable)() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 6f978dd7887e0..889bfc1c680ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow} @@ -70,20 +72,7 @@ object RDDConversions { object ExternalRDD { def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = { - val attr = { - val attr = CatalystSerde.generateObjAttr[T] - // Since ExpressionEncoder[T].deserializer is not resolved here, - // cannot access ExpressionEncoder[T].deserializer.nullable. - // We infer nullability from DataType - attr.dataType match { - case BooleanType => attr - case _: IntegralType => attr - case FloatType | DoubleType => attr - case DecimalType.Fixed(p, s) if p <= Decimal.MAX_LONG_DIGITS => attr - case _ => attr.withNullability(true) - } - } - val externalRdd = ExternalRDD(attr, rdd)(session) + val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session) CatalystSerde.serialize[T](externalRdd) } } From 6e3e44dd329d679f973c75385cf8b7680b700ca1 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 21 Mar 2017 19:30:44 +0900 Subject: [PATCH 5/9] address review comment --- .../scala/org/apache/spark/sql/execution/ExistingRDD.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 889bfc1c680ab..49336f424822f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution -import scala.reflect.runtime.universe.TypeTag - import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow} @@ -27,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.DataType import org.apache.spark.util.Utils object RDDConversions { From d7d0a36f6b4fb78cc0a3a13f870a41b03adf882f Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 22 Mar 2017 01:35:40 +0900 Subject: [PATCH 6/9] Also check resolution myself --- .../org/apache/spark/sql/catalyst/plans/logical/object.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index b4c07a0317d1c..8905da7c1e38e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -43,7 +43,7 @@ object CatalystSerde { def generateObjAttr[T : Encoder]: Attribute = { val deserializer = encoderFor[T].deserializer val dataType = deserializer.dataType - val nullable = if (deserializer.childrenResolved) { + val nullable = if (deserializer.resolved) { deserializer.nullable } else { // Since deserializer is not resolved here, cannot access deserializer.nullable. From 0a455be971a601c50e11e6d8a7e631f18ad54d76 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 22 Mar 2017 16:22:19 +0900 Subject: [PATCH 7/9] address review comment --- .../spark/sql/DataFrameImplicitsSuite.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala index dd4a2e2e338d1..63094d1b6122b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala @@ -53,13 +53,13 @@ class DataFrameImplicitsSuite extends QueryTest with SharedSQLContext { } test("SPARK-19959: df[java.lang.Long].collect includes null throws NullPointerException") { - val dfInt = sparkContext.parallelize(Seq[java.lang.Integer](0, null, 2), 1).toDF - assert(dfInt.collect === Array(Row(0), Row(null), Row(2))) - val dfLong = sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF - assert(dfLong.collect === Array(Row(0L), Row(null), Row(2L))) - val dfFloat = sparkContext.parallelize(Seq[java.lang.Float](0.0F, null, 2.0F), 1).toDF - assert(dfFloat.collect === Array(Row(0.0F), Row(null), Row(2.0F))) - val dfDouble = sparkContext.parallelize(Seq[java.lang.Double](0.0D, null, 2.0D), 1).toDF - assert(dfDouble.collect === Array(Row(0.0D), Row(null), Row(2.0D))) + checkAnswer(sparkContext.parallelize(Seq[java.lang.Integer](0, null, 2), 1).toDF, + Seq(Row(0), Row(null), Row(2))) + checkAnswer(sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF, + Seq(Row(0L), Row(null), Row(2L))) + checkAnswer(sparkContext.parallelize(Seq[java.lang.Float](0.0F, null, 2.0F), 1).toDF, + Seq(Row(0.0F), Row(null), Row(2.0F))) + checkAnswer(sparkContext.parallelize(Seq[java.lang.Double](0.0D, null, 2.0D), 1).toDF, + Seq(Row(0.0D), Row(null), Row(2.0D))) } } From 299fd68b05673fbc03f34e4c51151339a57fba2d Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 24 Mar 2017 00:35:23 +0900 Subject: [PATCH 8/9] address review comment for get nullable --- .../sql/catalyst/plans/logical/object.scala | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 8905da7c1e38e..18ae5803a4fc5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -41,19 +41,9 @@ object CatalystSerde { } def generateObjAttr[T : Encoder]: Attribute = { - val deserializer = encoderFor[T].deserializer - val dataType = deserializer.dataType - val nullable = if (deserializer.resolved) { - deserializer.nullable - } else { - // Since deserializer is not resolved here, cannot access deserializer.nullable. - // We infer nullability from DataType - dataType match { - case BooleanType | FloatType | DoubleType => false - case _: IntegralType => false - case _ => true - } - } + val enc = encoderFor[T] + val dataType = enc.deserializer.dataType + val nullable = !enc.clsTag.runtimeClass.isPrimitive AttributeReference("obj", dataType, nullable = nullable)() } } From e1fee6e1c42bed50adb26f399a58028d79267a12 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 24 Mar 2017 01:19:42 +0900 Subject: [PATCH 9/9] addressed review comment --- .../org/apache/spark/sql/catalyst/plans/logical/object.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 18ae5803a4fc5..35ae9a28ab7a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -44,7 +44,7 @@ object CatalystSerde { val enc = encoderFor[T] val dataType = enc.deserializer.dataType val nullable = !enc.clsTag.runtimeClass.isPrimitive - AttributeReference("obj", dataType, nullable = nullable)() + AttributeReference("obj", dataType, nullable)() } }