diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ea187c0316c17..5ececc7e3c97e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3902,6 +3902,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val RESPECT_USER_SCHEMA_NULLABILITY_FOR_FILE_DATA_SOURCES = + buildConf("spark.sql.respectUserSchemaNullabilityForFileDataSourceWithFilePath") + .doc("When true, the nullability in the user-specified schema for " + + "`DataFrameReader.schema(schema).json(path)` and .csv(path) and .xml(path) is respected" + + "Otherwise, they are turned to a nullable schema forcibly.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + val REPL_EAGER_EVAL_ENABLED = buildConf("spark.sql.repl.eagerEval.enabled") .doc("Enables eager evaluation or not. When true, the top K rows of Dataset will be " + "displayed if and only if the REPL supports the eager evaluation. Currently, the " + @@ -6022,6 +6031,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR).getOrElse(!ansiEnabled) } + def respectUserSchemaNullabilityForFileDataSources: Boolean = + getConf(SQLConf.RESPECT_USER_SCHEMA_NULLABILITY_FOR_FILE_DATA_SOURCES) + def isReplEagerEvalEnabled: Boolean = getConf(SQLConf.REPL_EAGER_EVAL_ENABLED) def replEagerEvalMaxNumRows: Int = getConf(SQLConf.REPL_EAGER_EVAL_MAX_NUM_ROWS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 4eee731e0b2d6..c9fb0db2664d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.util.SchemaUtils @@ -74,6 +75,7 @@ abstract class FileTable( } fileIndex match { case _: MetadataLogFileIndex => schema + case _ if SQLConf.get.respectUserSchemaNullabilityForFileDataSources => schema case _ => schema.asNullable } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index aea95f0af117a..f4f4d7c22e7f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3354,6 +3354,66 @@ abstract class JsonSuite } } + test("SPARK-49893: Validate user schema nullability is respected when file is read") { + withSQLConf(SQLConf.RESPECT_USER_SCHEMA_NULLABILITY_FOR_FILE_DATA_SOURCES.key -> "true") { + val schema = StructType(Seq( + StructField("f1", LongType, nullable = true), + StructField("f2", LongType, nullable = false) + )) + + val invalidFiles = Seq( + """ {"f1": 1200} """, // Field is missing + """ {"f1": 1200, "f2": null} """, // Field is null + ) + + invalidFiles.foreach(invalidFileContent => { + withTempPath(file => { + Seq(invalidFileContent).toDF() + .repartition(1) + .write + .text(file.getAbsolutePath) + + val df = spark.read + .schema(schema) + .json(file.getAbsolutePath) + + val schemaStr = df.schema.treeString + assert(schemaStr.contains("f2: string (nullable = false)")) + + assertThrows[SparkException] { + df.collect() + } + }) + }) + + // Sequence of tuples, where first tuple element is file content, + // while second tuple element is expected value after parsing + val validFiles = Seq( + (""" {"f1": 1200, "f2": 1000} """, 1000), // Non default value for non-nullable field f2 + (""" {"f1": 1200, "f2": 0} """, 0), // Default value for non-nullable filed f2 + ) + + validFiles.foreach { + case (validFileContent, expectedFieldValue) => + withTempPath(file => { + Seq(validFileContent).toDF() + .repartition(1) + .write + .text(file.getAbsolutePath) + + val df = spark.read + .schema(schema) + .json(file.getAbsolutePath) + + val schemaStr = df.schema.treeString + assert(schemaStr.contains("f2: string (nullable = false)")) + + checkAnswer(df, Row(1200, expectedFieldValue)) + }) + } + } + } + test("SPARK-36379: proceed parsing with root nulls in permissive mode") { val exception = intercept[SparkException] { spark.read.option("mode", "failfast")