Skip to content

Commit 1abaf1b

Browse files
committed
Turning to nullable schema when it is set in DataFrameReader
1 parent 30345c4 commit 1abaf1b

File tree

5 files changed

+23
-3
lines changed

5 files changed

+23
-3
lines changed

python/pyspark/sql/streaming.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,7 @@ def _test():
839839
globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
840840
globs['sdf'] = \
841841
spark.readStream.format('text').load('python/test_support/sql/streaming')
842-
globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
842+
globs['sdf_schema'] = StructType([StructField("data", StringType(), True)])
843843
globs['df'] = \
844844
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
845845

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
6060
* @since 1.4.0
6161
*/
6262
def schema(schema: StructType): DataFrameReader = {
63-
this.userSpecifiedSchema = Option(schema)
63+
this.userSpecifiedSchema = Option(schema.asNullable)
6464
this
6565
}
6666

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
5353
* @since 2.0.0
5454
*/
5555
def schema(schema: StructType): DataStreamReader = {
56-
this.userSpecifiedSchema = Option(schema)
56+
this.userSpecifiedSchema = Option(schema.asNullable)
5757
this
5858
}
5959

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,6 +1191,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
11911191
newSource.getBatch(None, FileStreamSourceOffset(1))
11921192
}
11931193
}
1194+
1195+
test("Force user specified schema to the nullable") {
1196+
withTempDir { src =>
1197+
stringToFile(new File(src, "1"), "a\nb\nc")
1198+
val userSchema =
1199+
StructType(StructField("a", StringType, nullable = false) :: Nil)
1200+
val schema = createFileStreamSourceAndGetSchema(
1201+
format = Some("parquet"), path = Some(src.getCanonicalPath), schema = Some(userSchema))
1202+
assert(schema != userSchema)
1203+
assert(schema === userSchema.asNullable)
1204+
}
1205+
}
11941206
}
11951207

11961208
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {

sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,4 +635,12 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
635635
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
636636
}
637637
}
638+
639+
test("SPARK-16472: Force user specified schema to the nullable") {
640+
val schema = new StructType().add("s", StringType, nullable = false)
641+
val nullableSchema = schema.asNullable
642+
val df = spark.read.schema(schema).text()
643+
assert(df.schema != schema)
644+
assert(df.schema == nullableSchema)
645+
}
638646
}

0 commit comments

Comments
 (0)