Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,7 @@ test_that("column functions", {

# check for unparseable
df <- as.DataFrame(list(list("a" = "")))
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA)

# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
Expand Down
2 changes: 2 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ displayTitle: Spark SQL Upgrading Guide

- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.

- In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independetly of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.

- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.

## Upgrading From Spark SQL 2.3 to 2.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,15 +550,23 @@ case class JsonToStructs(
s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.")
}

// This converts parsed rows to the desired output by the given schema.
@transient
lazy val converter = nullableSchema match {
case _: StructType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
case _: ArrayType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null
case _: MapType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null
private lazy val castRow = nullableSchema match {
case _: StructType => (row: InternalRow) => row
case _: ArrayType => (row: InternalRow) => row.getArray(0)
case _: MapType => (row: InternalRow) => row.getMap(0)
}

// This converts parsed rows to the desired output by the given schema.
private def convertRow(rows: Iterator[InternalRow]) = {
if (rows.hasNext) {
val result = rows.next()
// JSON's parser produces one record only.
assert(!rows.hasNext)
castRow(result)
} else {
throw new IllegalArgumentException("Expected one row from JSON parser.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can only happen when we have a bug, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it must not happen.

}
}

val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
Expand Down Expand Up @@ -593,7 +601,7 @@ case class JsonToStructs(
copy(timeZoneId = Option(timeZoneId))

override def nullSafeEval(json: Any): Any = {
converter(parser.parse(json.asInstanceOf[UTF8String]))
convertRow(parser.parse(json.asInstanceOf[UTF8String]))
}

override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ class JacksonParser(
// a null first token is equivalent to testing for input.trim.isEmpty
// but it works on any token stream and not just strings
parser.nextToken() match {
case null => Nil
case null => throw new RuntimeException("Not found any JSON token")
case _ => rootConverter.apply(parser) match {
case null => throw new RuntimeException("Root converter returned null")
case rows => rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
null
InternalRow(null)
)
}

Expand Down
10 changes: 0 additions & 10 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,16 +240,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Seq(Row("1"), Row("2")))
}

test("SPARK-11226 Skip empty line in json file") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the test because it is not relevant to the default mode PERMISSIVE any more. And the SQLQuerySuite is not perfect place for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is it moved to then? Does that mean we don't have a regression test for SPARK-11226 anymore?

spark.read
.json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS())
.createOrReplaceTempView("d")

checkAnswer(
sql("select count(1) from d"),
Seq(Row(3)))
}

test("SPARK-8828 sum should return null if all input values are null") {
checkAnswer(
sql("select sum(a), avg(a) from allNulls"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for json data source, previous behavior is, we would skip the row even it's in PERMISSIVE mode. Shall we clearly mention it in the migration guide?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for json data source, previous behavior is, we would skip the row even it's in PERMISSIVE mode.

Yes, we skipped such rows if Jackson parser wasn't able to find any root tokens. So, not only empty strings and gaps got into the category.

Shall we clearly mention it in the migration guide?

Sure.

Row("str_a_4", "str_b_4", "str_c_4"),
Row(null, null, null))
)
Expand All @@ -1136,6 +1137,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.select($"a", $"b", $"c", $"_unparsed"),
Row(null, null, null, "{") ::
Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
Expand All @@ -1150,6 +1152,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
Row("{") ::
Row("") ::
Row("""{"a":1, b:2}""") ::
Row("""{"a":{, b:3}""") ::
Row("]") :: Nil
Expand All @@ -1171,6 +1174,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.selectExpr("a", "b", "c", "_malformed"),
Row(null, null, null, "{") ::
Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
Expand Down Expand Up @@ -1813,6 +1817,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val path = dir.getCanonicalPath
primitiveFieldAndType
.toDF("value")
.repartition(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the repartition required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I remember I added the repartition(1) here and in other places because to eliminate empty files. Such empty files are produced by empty partitions. Probably we could avoid writing empty files at least in the case of text-based datasources but any case let's look at TextOutputWriter, for example. It creates an input stream for a file in its constructor:

private val writer = CodecStreams.createOutputStream(context, new Path(path))

and closes the empty file in . So, even if we didn't write anythings to the file, it creates an empty file.

From the read side, when Jackson parser tries to read the empty file, it cannot detect any JSON tokens on the root level and returns null from nextToken() for which I throw a bad record exception for now -> Row(...) in PERMISSIVE mode.

.write
.option("compression", "GzIp")
.text(path)
Expand All @@ -1838,6 +1843,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val path = dir.getCanonicalPath
primitiveFieldAndType
.toDF("value")
.repartition(1)
.write
.text(path)

Expand Down Expand Up @@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.text(path)

val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path)
assert(jsonDF.count() === corruptRecordCount)
assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, does this mean that it reads an empty record from empty file after this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's true, we should not do this. Empty files can be generated in many cases for now and the behaviour is not currently well defined. If we rely on this behaviour, it will cause some weird behaviours or bugs hard to fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we skip empty files for all the file-based data sources?

assert(jsonDF.schema === new StructType()
.add("_corrupt_record", StringType)
.add("dummy", StringType))
Expand All @@ -1905,7 +1911,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
F.count($"dummy").as("valid"),
F.count($"_corrupt_record").as("corrupt"),
F.count("*").as("count"))
checkAnswer(counts, Row(1, 4, 6))
checkAnswer(counts, Row(1, 5, 7)) // null row for empty file
}
}

Expand Down Expand Up @@ -2513,7 +2519,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}

checkCount(2)
countForMalformedJSON(0, Seq(""))
countForMalformedJSON(1, Seq(""))
}

test("SPARK-25040: empty strings should be disallowed") {
Expand Down