From af508deafa1f1ba3ff46fc98a0e2fbbc81c6a4c8 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 26 Nov 2015 00:26:14 -0800 Subject: [PATCH 1/2] [SPARK-11997] NPE when save a DataFrame as parquet and partitioned by long column --- .../org/apache/spark/sql/sources/interfaces.scala | 14 +++++++++++--- .../datasources/parquet/ParquetQuerySuite.scala | 13 +++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index f9465157c936d..2a4d8231f6f97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -606,9 +606,17 @@ abstract class HadoopFsRelation private[sql]( // we need to cast into the data type that user specified. def castPartitionValuesToUserSchema(row: InternalRow) = { InternalRow((0 until row.numFields).map { i => - Cast( - Literal.create(row.getString(i), StringType), - userProvidedSchema.fields(i).dataType).eval() + row.isNullAt(i) match { + case true => + Cast( + Literal.create(null, StringType), + userProvidedSchema.fields(i).dataType).eval() + case false => + Cast( + Literal.create(row.getString(i), StringType), + userProvidedSchema.fields(i).dataType).eval() + + } }: _*) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 70fae32b7e7a1..f777e973052d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -252,6 +252,19 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } + test("SPARK-11997 parquet with null partition values") { + withTempPath { dir => + val path = dir.getCanonicalPath + sqlContext.range(1, 3) + .selectExpr("if(id % 2 = 0, null, id) AS n", "id") + .write.partitionBy("n").parquet(path) + + checkAnswer( + sqlContext.read.parquet(path).filter("n is null"), + Row(2, null)) + } + } + // This test case is ignored because of parquet-mr bug PARQUET-370 ignore("SPARK-10301 requested schema clipping - schemas with disjoint sets of fields") { withTempPath { dir => From 4de7697753f0da6810190bea804b9f490a68bb98 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 26 Nov 2015 17:32:23 -0800 Subject: [PATCH 2/2] review comments from Davies --- .../org/apache/spark/sql/sources/interfaces.scala | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 2a4d8231f6f97..9ace25dc7d21b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -606,17 +606,9 @@ abstract class HadoopFsRelation private[sql]( // we need to cast into the data type that user specified. def castPartitionValuesToUserSchema(row: InternalRow) = { InternalRow((0 until row.numFields).map { i => - row.isNullAt(i) match { - case true => - Cast( - Literal.create(null, StringType), - userProvidedSchema.fields(i).dataType).eval() - case false => - Cast( - Literal.create(row.getString(i), StringType), - userProvidedSchema.fields(i).dataType).eval() - - } + Cast( + Literal.create(row.getUTF8String(i), StringType), + userProvidedSchema.fields(i).dataType).eval() }: _*) }