Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ class JacksonParser(
schema.existenceDefaultsBitmask(index) = false
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) if isRoot =>
case NonFatal(e) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you allow this, please, check all complex types map, struct. Or have you added some tests already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried covering different cases in:

  • SPARK-40646: return partial results for JSON arrays with objects
  • SPARK-40646: return partial results for objects with values as JSON arrays

I think I am missing a test for map, will add, thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test!

badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
}
Expand All @@ -482,14 +482,31 @@ class JacksonParser(
fieldConverter: ValueConverter): MapData = {
val keys = ArrayBuffer.empty[UTF8String]
val values = ArrayBuffer.empty[Any]
var badRecordException: Option[Throwable] = None

while (nextUntil(parser, JsonToken.END_OBJECT)) {
keys += UTF8String.fromString(parser.getCurrentName)
values += fieldConverter.apply(parser)
try {
values += fieldConverter.apply(parser)
} catch {
case PartialResultException(row, cause) =>
badRecordException = badRecordException.orElse(Some(cause))
values += row
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
}
}

// The JSON map will never have null or duplicated map keys, it's safe to create a
// ArrayBasedMapData directly here.
ArrayBasedMapData(keys.toArray, values.toArray)
val mapData = ArrayBasedMapData(keys.toArray, values.toArray)

if (badRecordException.isEmpty) {
mapData
} else {
throw PartialResultException(InternalRow(mapData), badRecordException.get)
}
}

/**
Expand All @@ -500,13 +517,27 @@ class JacksonParser(
fieldConverter: ValueConverter,
isRoot: Boolean = false): ArrayData = {
val values = ArrayBuffer.empty[Any]
var badRecordException: Option[Throwable] = None

while (nextUntil(parser, JsonToken.END_ARRAY)) {
val v = fieldConverter.apply(parser)
if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError()
values += v
try {
val v = fieldConverter.apply(parser)
if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError()
values += v
} catch {
case PartialResultException(row, cause) =>
badRecordException = badRecordException.orElse(Some(cause))
values += row
}
}

new GenericArrayData(values.toArray)
val arrayData = new GenericArrayData(values.toArray)

if (badRecordException.isEmpty) {
arrayData
} else {
throw PartialResultException(InternalRow(arrayData), badRecordException.get)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,11 +853,81 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0")
checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null)))
val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null))
checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null))))
val df4 = Seq("""{"c2": [19]}""").toDF("c0")
checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), Row(null))
}

test("SPARK-40646: return partial results for JSON arrays with objects") {
val st = new StructType()
.add("c1", StringType)
.add("c2", ArrayType(new StructType().add("a", LongType)))

// "c2" is expected to be an array of structs but it is a struct in the data.
val df = Seq("""[{"c2": {"a": 1}, "c1": "abc"}]""").toDF("c0")
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
Row(Array(Row("abc", null)))
)
}

test("SPARK-40646: return partial results for JSON maps") {
val st = new StructType()
.add("c1", MapType(StringType, IntegerType))
.add("c2", StringType)

// Map "c2" has "k2" key that is a string, not an integer.
val df = Seq("""{"c1": {"k1": 1, "k2": "A", "k3": 3}, "c2": "abc"}""").toDF("c0")
checkAnswer(
df.select(from_json($"c0", st)),
Row(Row(null, "abc"))
)
}

test("SPARK-40646: return partial results for JSON arrays") {
val st = new StructType()
.add("c", ArrayType(IntegerType))

// Values in the array are strings instead of integers.
val df = Seq("""["a", "b", "c"]""").toDF("c0")
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
Row(null)
)
}

test("SPARK-40646: return partial results for nested JSON arrays") {
val st = new StructType()
.add("c", ArrayType(ArrayType(IntegerType)))

// The second array contains a string instead of an integer.
val df = Seq("""[[1], ["2"]]""").toDF("c0")
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
Row(null)
)
}

test("SPARK-40646: return partial results for objects with values as JSON arrays") {
val st = new StructType()
.add("c1",
ArrayType(
StructType(
StructField("c2", ArrayType(IntegerType)) ::
Nil
)
)
)

// Value "a" cannot be parsed as an integer,
// the error cascades to "c2", thus making its value null.
val df = Seq("""[{"c1": [{"c2": ["a"]}]}]""").toDF("c0")
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
Row(Array(Row(null)))
)
}

test("SPARK-33270: infers schema for JSON field with spaces and pass them to from_json") {
val in = Seq("""{"a b": 1}""").toDS()
val out = in.select(from_json($"value", schema_of_json("""{"a b": 100}""")) as "parsed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3381,6 +3381,27 @@ abstract class JsonSuite
}
}

test("SPARK-40646: parse subsequent fields if the first JSON field does not match schema") {
// In this example, the first record has "a.y" as boolean but it needs to be an object.
// We should parse "a" as null but continue parsing "b" correctly as it is valid.
withTempPath { path =>
Seq(
"""{"a": {"x": 1, "y": true}, "b": {"x": 1}}""",
"""{"a": {"x": 2}, "b": {"x": 2}}"""").toDF()
.repartition(1)
.write.text(path.getAbsolutePath)

val df = spark.read
.schema("a struct<x: int, y: struct<x: int>>, b struct<x: int>")
.json(path.getAbsolutePath)

checkAnswer(
df,
Seq(Row(null, Row(1)), Row(Row(2, null), Row(2)))
)
}
}

test("SPARK-40667: validate JSON Options") {
assert(JSONOptions.getAllOptions.size == 28)
// Please add validation on any new Json options here
Expand Down