Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3902,6 +3902,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val RESPECT_USER_SCHEMA_NULLABILITY_FOR_FILE_DATA_SOURCES =
buildConf("spark.sql.respectUserSchemaNullabilityForFileDataSourceWithFilePath")
.doc("When true, the nullability in the user-specified schema for " +
"`DataFrameReader.schema(schema).json(path)` and .csv(path) and .xml(path) is respected" +
"Otherwise, they are turned to a nullable schema forcibly.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val REPL_EAGER_EVAL_ENABLED = buildConf("spark.sql.repl.eagerEval.enabled")
.doc("Enables eager evaluation or not. When true, the top K rows of Dataset will be " +
"displayed if and only if the REPL supports the eager evaluation. Currently, the " +
Expand Down Expand Up @@ -6022,6 +6031,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR).getOrElse(!ansiEnabled)
}

def respectUserSchemaNullabilityForFileDataSources: Boolean =
getConf(SQLConf.RESPECT_USER_SCHEMA_NULLABILITY_FOR_FILE_DATA_SOURCES)

def isReplEagerEvalEnabled: Boolean = getConf(SQLConf.REPL_EAGER_EVAL_ENABLED)

def replEagerEvalMaxNumRows: Int = getConf(SQLConf.REPL_EAGER_EVAL_MAX_NUM_ROWS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.SchemaUtils
Expand Down Expand Up @@ -74,6 +75,7 @@ abstract class FileTable(
}
fileIndex match {
case _: MetadataLogFileIndex => schema
case _ if SQLConf.get.respectUserSchemaNullabilityForFileDataSources => schema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why changing the V2 only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other code paths are covered in data frame reader, if flag LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION is set to true, then schema nullability will be preserved.

case _ => schema.asNullable
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3354,6 +3354,66 @@ abstract class JsonSuite
}
}

test("SPARK-49893: Validate user schema nullability is respected when file is read") {
withSQLConf(SQLConf.RESPECT_USER_SCHEMA_NULLABILITY_FOR_FILE_DATA_SOURCES.key -> "true") {
val schema = StructType(Seq(
StructField("f1", LongType, nullable = true),
StructField("f2", LongType, nullable = false)
))

val invalidFiles = Seq(
""" {"f1": 1200} """, // Field is missing
""" {"f1": 1200, "f2": null} """, // Field is null
)

invalidFiles.foreach(invalidFileContent => {
withTempPath(file => {
Seq(invalidFileContent).toDF()
.repartition(1)
.write
.text(file.getAbsolutePath)

val df = spark.read
.schema(schema)
.json(file.getAbsolutePath)

val schemaStr = df.schema.treeString
assert(schemaStr.contains("f2: string (nullable = false)"))

assertThrows[SparkException] {
df.collect()
}
})
})

// Sequence of tuples, where first tuple element is file content,
// while second tuple element is expected value after parsing
val validFiles = Seq(
(""" {"f1": 1200, "f2": 1000} """, 1000), // Non default value for non-nullable field f2
(""" {"f1": 1200, "f2": 0} """, 0), // Default value for non-nullable filed f2
)

validFiles.foreach {
case (validFileContent, expectedFieldValue) =>
withTempPath(file => {
Seq(validFileContent).toDF()
.repartition(1)
.write
.text(file.getAbsolutePath)

val df = spark.read
.schema(schema)
.json(file.getAbsolutePath)

val schemaStr = df.schema.treeString
assert(schemaStr.contains("f2: string (nullable = false)"))

checkAnswer(df, Row(1200, expectedFieldValue))
})
}
}
}

test("SPARK-36379: proceed parsing with root nulls in permissive mode") {
val exception = intercept[SparkException] {
spark.read.option("mode", "failfast")
Expand Down