Skip to content

Commit 6689b97

Browse files
committed
[SPARK-35912][SQL][FOLLOW-UP] Add a legacy configuration for respecting nullability in DataFrame.schema.csv/json(ds)
### What changes were proposed in this pull request? This PR is a followup of #33436, that adds a legacy configuration. It's found that it can break a valid usacase (https://github.com/apache/spark/pull/33436/files#r863271189): ```scala import org.apache.spark.sql.types._ val ds = Seq("a,", "a,b").toDS spark.read.schema( StructType( StructField("f1", StringType, nullable = false) :: StructField("f2", StringType, nullable = false) :: Nil) ).option("mode", "DROPMALFORMED").csv(ds).show() ``` **Before:** ``` +---+---+ | f1| f2| +---+---+ | a| b| +---+---+ ``` **After:** ``` +---+----+ | f1| f2| +---+----+ | a|null| | a| b| +---+----+ ``` This PR adds a configuration to restore **Before** behaviour. ### Why are the changes needed? To avoid breakage of valid usecases. ### Does this PR introduce _any_ user-facing change? Yes, it adds a new configuration `spark.sql.legacy.respectNullabilityInTextDatasetConversion` (`false` by default) to respect the nullability in `DataFrameReader.schema(schema).csv(dataset)` and `DataFrameReader.schema(schema).json(dataset)` when the user-specified schema is provided. ### How was this patch tested? Unittests were added. Closes #36435 from HyukjinKwon/SPARK-35912. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 7c8f702 commit 6689b97

File tree

5 files changed

+46
-4
lines changed

5 files changed

+46
-4
lines changed

docs/sql-migration-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ license: |
3030

3131
- Since Spark 3.3, the functions `lpad` and `rpad` have been overloaded to support byte sequences. When the first argument is a byte sequence, the optional padding pattern must also be a byte sequence and the result is a BINARY value. The default padding pattern in this case is the zero byte. To restore the legacy behavior of always returning string types, set `spark.sql.legacy.lpadRpadAlwaysReturnString` to `true`.
3232

33-
- Since Spark 3.3, Spark turns a non-nullable schema into nullable for API `DataFrameReader.schema(schema: StructType).json(jsonDataset: Dataset[String])` and `DataFrameReader.schema(schema: StructType).csv(csvDataset: Dataset[String])` when the schema is specified by the user and contains non-nullable fields.
33+
- Since Spark 3.3, Spark turns a non-nullable schema into nullable for API `DataFrameReader.schema(schema: StructType).json(jsonDataset: Dataset[String])` and `DataFrameReader.schema(schema: StructType).csv(csvDataset: Dataset[String])` when the schema is specified by the user and contains non-nullable fields. To restore the legacy behavior of respecting the nullability, set `spark.sql.legacy.respectNullabilityInTextDatasetConversion` to `true`.
3434

3535
- Since Spark 3.3, when the date or timestamp pattern is not specified, Spark converts an input string to a date/timestamp using the `CAST` expression approach. The changes affect CSV/JSON datasources and parsing of partition values. In Spark 3.2 or earlier, when the date or timestamp pattern is not set, Spark uses the default patterns: `yyyy-MM-dd` for dates and `yyyy-MM-dd HH:mm:ss` for timestamps. After the changes, Spark still recognizes the pattern together with
3636

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3025,6 +3025,17 @@ object SQLConf {
30253025
.intConf
30263026
.createOptional
30273027

3028+
val LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION =
3029+
buildConf("spark.sql.legacy.respectNullabilityInTextDatasetConversion")
3030+
.internal()
3031+
.doc("When true, the nullability in the user-specified schema for " +
3032+
"`DataFrameReader.schema(schema).json(jsonDataset)` and " +
3033+
"`DataFrameReader.schema(schema).csv(csvDataset)` is respected. Otherwise, they are " +
3034+
"turned to a nullable schema forcibly.")
3035+
.version("3.3.0")
3036+
.booleanConf
3037+
.createWithDefault(false)
3038+
30283039
val REPL_EAGER_EVAL_ENABLED = buildConf("spark.sql.repl.eagerEval.enabled")
30293040
.doc("Enables eager evaluation or not. When true, the top K rows of Dataset will be " +
30303041
"displayed if and only if the REPL supports the eager evaluation. Currently, the " +

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.csv._
3838
import org.apache.spark.sql.execution.datasources.jdbc._
3939
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
4040
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
41+
import org.apache.spark.sql.internal.SQLConf
4142
import org.apache.spark.sql.types.StructType
4243
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4344
import org.apache.spark.unsafe.types.UTF8String
@@ -406,7 +407,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
406407
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
407408

408409
userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
409-
val schema = userSpecifiedSchema.map(_.asNullable).getOrElse {
410+
val schema = userSpecifiedSchema.map {
411+
case s if !SQLConf.get.getConf(
412+
SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => s.asNullable
413+
case other => other
414+
}.getOrElse {
410415
TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
411416
}
412417

@@ -478,7 +483,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
478483
None
479484
}
480485

481-
val schema = userSpecifiedSchema.map(_.asNullable).getOrElse {
486+
val schema = userSpecifiedSchema.map {
487+
case s if !SQLConf.get.getConf(
488+
SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => s.asNullable
489+
case other => other
490+
}.getOrElse {
482491
TextInputCSVDataSource.inferFromDataset(
483492
sparkSession,
484493
csvDataset,

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2693,6 +2693,16 @@ abstract class CSVSuite
26932693
assert(df.schema == expected)
26942694
checkAnswer(df, Row(1, null) :: Nil)
26952695
}
2696+
2697+
withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key -> "true") {
2698+
checkAnswer(
2699+
spark.read.schema(
2700+
StructType(
2701+
StructField("f1", StringType, nullable = false) ::
2702+
StructField("f2", StringType, nullable = false) :: Nil)
2703+
).option("mode", "DROPMALFORMED").csv(Seq("a,", "a,b").toDS),
2704+
Row("a", "b"))
2705+
}
26962706
}
26972707

26982708
test("SPARK-36536: use casting when datetime pattern is not set") {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3165,7 +3165,7 @@ abstract class JsonSuite
31653165
Seq(missingFieldInput, nullValueInput).foreach { jsonString =>
31663166
Seq("DROPMALFORMED", "FAILFAST", "PERMISSIVE").foreach { mode =>
31673167
val json = spark.createDataset(
3168-
spark.sparkContext.parallelize(jsonString:: Nil))(Encoders.STRING)
3168+
spark.sparkContext.parallelize(jsonString :: Nil))(Encoders.STRING)
31693169
val df = spark.read
31703170
.option("mode", mode)
31713171
.schema(schema)
@@ -3174,6 +3174,18 @@ abstract class JsonSuite
31743174
checkAnswer(df, Row(1, null) :: Nil)
31753175
}
31763176
}
3177+
3178+
withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key -> "true") {
3179+
checkAnswer(
3180+
spark.read.schema(
3181+
StructType(
3182+
StructField("f1", LongType, nullable = false) ::
3183+
StructField("f2", LongType, nullable = false) :: Nil)
3184+
).option("mode", "DROPMALFORMED").json(Seq("""{"f1": 1}""").toDS),
3185+
// It is for testing legacy configuration. This is technically a bug as
3186+
// `0` has to be `null` but the schema is non-nullable.
3187+
Row(1, 0))
3188+
}
31773189
}
31783190

31793191
test("SPARK-36379: proceed parsing with root nulls in permissive mode") {

0 commit comments

Comments
 (0)