From 460948d0a73af42c00b6233948f41575e8bff562 Mon Sep 17 00:00:00 2001 From: Uros Stankovic Date: Wed, 2 Oct 2024 11:22:37 +0200 Subject: [PATCH 1/2] Respect user schema nullability for file data sources when using TableProvider --- .../apache/spark/sql/internal/SQLConf.scala | 12 +++++++ .../execution/datasources/v2/FileTable.scala | 2 ++ .../datasources/json/JsonSuite.scala | 32 +++++++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ea187c0316c17..6743b357a96f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3902,6 +3902,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val RESPECT_USER_SCHEMA_NULLABILITY_FOR_FILE_DATA_SOURCES = + buildConf("spark.sql.respectUserSchemaNullabilityForFileDataSources") + .internal() + .doc("When true, the nullability in the user-specified schema for " + + "`DataFrameReader.schema(schema).json(path)` and .csv(path) and .xml(path) is respected" + + "Otherwise, they are turned to a nullable schema forcibly.") + .version("4.0.0") + .fallbackConf(LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) + val REPL_EAGER_EVAL_ENABLED = buildConf("spark.sql.repl.eagerEval.enabled") .doc("Enables eager evaluation or not. When true, the top K rows of Dataset will be " + "displayed if and only if the REPL supports the eager evaluation. Currently, the " + @@ -6022,6 +6031,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR).getOrElse(!ansiEnabled) } + def respectUserSchemaNullabilityForFileDataSources: Boolean = + getConf(SQLConf.RESPECT_USER_SCHEMA_NULLABILITY_FOR_FILE_DATA_SOURCES) + def isReplEagerEvalEnabled: Boolean = getConf(SQLConf.REPL_EAGER_EVAL_ENABLED) def replEagerEvalMaxNumRows: Int = getConf(SQLConf.REPL_EAGER_EVAL_MAX_NUM_ROWS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 4eee731e0b2d6..c9fb0db2664d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.util.SchemaUtils @@ -74,6 +75,7 @@ abstract class FileTable( } fileIndex match { case _: MetadataLogFileIndex => schema + case _ if SQLConf.get.respectUserSchemaNullabilityForFileDataSources => schema case _ => schema.asNullable } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index aea95f0af117a..78c4bbd8ce12b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3354,6 +3354,38 @@ abstract class JsonSuite } } + test("SPARK-: Read multi line JSON with default mode and provided schema") { + withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key -> "true") { + val schema = StructType(Seq( + StructField("f1", LongType, nullable = true), + StructField("f2", StringType, nullable = false) + )) + + withTempPath(file => { + val fileRawValue = + """ + |{ + | "f1": 1200 + |} + |""".stripMargin + + Seq(fileRawValue).toDF() + .repartition(1) + .write + .text(file.getAbsolutePath) + + val df = spark.read + .schema(schema) + .option("multiline", "true") + .json(file.getAbsolutePath) + + val schemaStr = df.schema.treeString + + assert(schemaStr.contains("f2: string (nullable = false)")) + }) + } + } + test("SPARK-36379: proceed parsing with root nulls in permissive mode") { val exception = intercept[SparkException] { spark.read.option("mode", "failfast") From f64be00f285b6c826dc9e47ba7b2b2acb0dfb922 Mon Sep 17 00:00:00 2001 From: Uros Stankovic Date: Mon, 7 Oct 2024 15:25:10 +0200 Subject: [PATCH 2/2] Add more tests --- .../apache/spark/sql/internal/SQLConf.scala | 6 +- .../datasources/json/JsonSuite.scala | 68 +++++++++++++------ 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6743b357a96f7..5ececc7e3c97e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3903,13 +3903,13 @@ object SQLConf { .createWithDefault(false) val RESPECT_USER_SCHEMA_NULLABILITY_FOR_FILE_DATA_SOURCES = - buildConf("spark.sql.respectUserSchemaNullabilityForFileDataSources") - .internal() + buildConf("spark.sql.respectUserSchemaNullabilityForFileDataSourceWithFilePath") .doc("When true, the nullability in the user-specified schema for " + "`DataFrameReader.schema(schema).json(path)` and .csv(path) and .xml(path) is respected" + "Otherwise, they are turned to a nullable schema forcibly.") .version("4.0.0") - .fallbackConf(LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) + .booleanConf + .createWithDefault(false) val REPL_EAGER_EVAL_ENABLED = buildConf("spark.sql.repl.eagerEval.enabled") .doc("Enables eager evaluation or not. When true, the top K rows of Dataset will be " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 78c4bbd8ce12b..f4f4d7c22e7f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3354,35 +3354,63 @@ abstract class JsonSuite } } - test("SPARK-: Read multi line JSON with default mode and provided schema") { - withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key -> "true") { + test("SPARK-49893: Validate user schema nullability is respected when file is read") { + withSQLConf(SQLConf.RESPECT_USER_SCHEMA_NULLABILITY_FOR_FILE_DATA_SOURCES.key -> "true") { val schema = StructType(Seq( StructField("f1", LongType, nullable = true), - StructField("f2", StringType, nullable = false) + StructField("f2", LongType, nullable = false) )) - withTempPath(file => { - val fileRawValue = - """ - |{ - | "f1": 1200 - |} - |""".stripMargin + val invalidFiles = Seq( + """ {"f1": 1200} """, // Field is missing + """ {"f1": 1200, "f2": null} """, // Field is null + ) - Seq(fileRawValue).toDF() - .repartition(1) - .write - .text(file.getAbsolutePath) + invalidFiles.foreach(invalidFileContent => { + withTempPath(file => { + Seq(invalidFileContent).toDF() + .repartition(1) + .write + .text(file.getAbsolutePath) - val df = spark.read - .schema(schema) - .option("multiline", "true") - .json(file.getAbsolutePath) + val df = spark.read + .schema(schema) + .json(file.getAbsolutePath) - val schemaStr = df.schema.treeString + val schemaStr = df.schema.treeString + assert(schemaStr.contains("f2: string (nullable = false)")) - assert(schemaStr.contains("f2: string (nullable = false)")) + assertThrows[SparkException] { + df.collect() + } + }) }) + + // Sequence of tuples, where first tuple element is file content, + // while second tuple element is expected value after parsing + val validFiles = Seq( + (""" {"f1": 1200, "f2": 1000} """, 1000), // Non default value for non-nullable field f2 + (""" {"f1": 1200, "f2": 0} """, 0), // Default value for non-nullable filed f2 + ) + + validFiles.foreach { + case (validFileContent, expectedFieldValue) => + withTempPath(file => { + Seq(validFileContent).toDF() + .repartition(1) + .write + .text(file.getAbsolutePath) + + val df = spark.read + .schema(schema) + .json(file.getAbsolutePath) + + val schemaStr = df.schema.treeString + assert(schemaStr.contains("f2: string (nullable = false)")) + + checkAnswer(df, Row(1200, expectedFieldValue)) + }) + } } }