Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ license: |
* Table of contents
{:toc}

## Upgrading from Spark SQL 3.2 to 3.3

- Since Spark 3.3, Spark turns a non-nullable schema into nullable for API `DataFrameReader.schema(schema: StructType).json(jsonDataset: Dataset[String])` and `DataFrameReader.schema(schema: StructType).csv(csvDataset: Dataset[String])` when the schema is specified by the user and contains non-nullable fields.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually underestimated this problem. It can actually change the results:

import org.apache.spark.sql.types._
val ds = Seq("a,", "a,b").toDS
spark.read.schema(
  StructType(
    StructField("f1", StringType, nullable = false) ::
    StructField("f2", StringType, nullable = false) :: Nil)
  ).option("mode", "FAILFAST").csv(ds).show()

Before:

+---+---+
| f1| f2|
+---+---+
|  a|  b|
+---+---+

After:

+---+----+
| f1|  f2|
+---+----+
|  a|null|
|  a|   b|
+---+----+

I think we should at least add a legacy configuration .. let me make a quick followup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


## Upgrading from Spark SQL 3.1 to 3.2

- Since Spark 3.2, ADD FILE/JAR/ARCHIVE commands require each path to be enclosed by `"` or `'` if the path contains whitespaces.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)

val schema = userSpecifiedSchema.getOrElse {
val schema = userSpecifiedSchema.map(_.asNullable).getOrElse {
TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
}

Expand Down Expand Up @@ -521,7 +521,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
None
}

val schema = userSpecifiedSchema.getOrElse {
val schema = userSpecifiedSchema.map(_.asNullable).getOrElse {
TextInputCSVDataSource.inferFromDataset(
sparkSession,
csvDataset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.{SparkConf, SparkException, TestUtils}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -2463,6 +2463,26 @@ abstract class CSVSuite
.option("ignoreTrailingWhiteSpace", "true").load(path.getAbsolutePath).count() == 1)
}
}

test("SPARK-35912: turn non-nullable schema into a nullable schema") {
val inputCSVString = """1,"""

val schema = StructType(Seq(
StructField("c1", IntegerType, nullable = false),
StructField("c2", IntegerType, nullable = false)))
val expected = schema.asNullable

Seq("DROPMALFORMED", "FAILFAST", "PERMISSIVE").foreach { mode =>
val csv = spark.createDataset(
spark.sparkContext.parallelize(inputCSVString:: Nil))(Encoders.STRING)
val df = spark.read
.option("mode", mode)
.schema(schema)
.csv(csv)
assert(df.schema == expected)
checkAnswer(df, Row(1, null) :: Nil)
}
}
}

class CSVv1Suite extends CSVSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2919,6 +2919,31 @@ abstract class JsonSuite
}
}
}

test("SPARK-35912: turn non-nullable schema into a nullable schema") {
// JSON field is missing.
val missingFieldInput = """{"c1": 1}"""
// JSON filed is null.
val nullValueInput = """{"c1": 1, "c2": null}"""

val schema = StructType(Seq(
StructField("c1", IntegerType, nullable = false),
StructField("c2", IntegerType, nullable = false)))
val expected = schema.asNullable

Seq(missingFieldInput, nullValueInput).foreach { jsonString =>
Seq("DROPMALFORMED", "FAILFAST", "PERMISSIVE").foreach { mode =>
val json = spark.createDataset(
spark.sparkContext.parallelize(jsonString:: Nil))(Encoders.STRING)
val df = spark.read
.option("mode", mode)
.schema(schema)
.json(json)
assert(df.schema == expected)
checkAnswer(df, Row(1, null) :: Nil)
}
}
}
}

class JsonV1Suite extends JsonSuite {
Expand Down