From 1abaf1baeb8e45586b4fbcd8bc2786259e1170ab Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 11 Nov 2016 00:48:22 +0900 Subject: [PATCH] Turning to nullable schema when it is set in DataFrameReader --- python/pyspark/sql/streaming.py | 2 +- .../scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../spark/sql/streaming/DataStreamReader.scala | 2 +- .../spark/sql/streaming/FileStreamSourceSuite.scala | 12 ++++++++++++ .../spark/sql/test/DataFrameReaderWriterSuite.scala | 8 ++++++++ 5 files changed, 23 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 5014299ad220f..82a14dc0b17f4 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -839,7 +839,7 @@ def _test(): globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext) globs['sdf'] = \ spark.readStream.format('text').load('python/test_support/sql/streaming') - globs['sdf_schema'] = StructType([StructField("data", StringType(), False)]) + globs['sdf_schema'] = StructType([StructField("data", StringType(), True)]) globs['df'] = \ globs['spark'].readStream.format('text').load('python/test_support/sql/streaming') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 365b50dee93c4..213af889a61b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -60,7 +60,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def schema(schema: StructType): DataFrameReader = { - this.userSpecifiedSchema = Option(schema) + this.userSpecifiedSchema = Option(schema.asNullable) this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 7db9d9264b1c9..630282b4efc00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -53,7 +53,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * @since 2.0.0 */ def schema(schema: StructType): DataStreamReader = { - this.userSpecifiedSchema = Option(schema) + this.userSpecifiedSchema = Option(schema.asNullable) this } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 8a9fa94bea601..058211aea944c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1191,6 +1191,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest { newSource.getBatch(None, FileStreamSourceOffset(1)) } } + + test("Force user specified schema to the nullable") { + withTempDir { src => + stringToFile(new File(src, "1"), "a\nb\nc") + val userSchema = + StructType(StructField("a", StringType, nullable = false) :: Nil) + val schema = createFileStreamSourceAndGetSchema( + format = Some("parquet"), path = Some(src.getCanonicalPath), schema = Some(userSchema)) + assert(schema != userSchema) + assert(schema === userSchema.asNullable) + } + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 4bec2e3fdb9d3..e0fa753286ec1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -635,4 +635,12 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) } } + + test("SPARK-16472: Force user specified schema to the nullable") { + val schema = new StructType().add("s", StringType, nullable = false) + val nullableSchema = schema.asNullable + val df = spark.read.schema(schema).text() + assert(df.schema != schema) + assert(df.schema == nullableSchema) + } }