diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index f8adac1ee44fe..609fe9bc903a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -456,7 +456,7 @@ class JacksonParser( schema.existenceDefaultsBitmask(index) = false } catch { case e: SparkUpgradeException => throw e - case NonFatal(e) if isRoot => + case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() } @@ -482,14 +482,31 @@ class JacksonParser( fieldConverter: ValueConverter): MapData = { val keys = ArrayBuffer.empty[UTF8String] val values = ArrayBuffer.empty[Any] + var badRecordException: Option[Throwable] = None + while (nextUntil(parser, JsonToken.END_OBJECT)) { keys += UTF8String.fromString(parser.getCurrentName) - values += fieldConverter.apply(parser) + try { + values += fieldConverter.apply(parser) + } catch { + case PartialResultException(row, cause) => + badRecordException = badRecordException.orElse(Some(cause)) + values += row + case NonFatal(e) => + badRecordException = badRecordException.orElse(Some(e)) + parser.skipChildren() + } } // The JSON map will never have null or duplicated map keys, it's safe to create a // ArrayBasedMapData directly here. - ArrayBasedMapData(keys.toArray, values.toArray) + val mapData = ArrayBasedMapData(keys.toArray, values.toArray) + + if (badRecordException.isEmpty) { + mapData + } else { + throw PartialResultException(InternalRow(mapData), badRecordException.get) + } } /** @@ -500,13 +517,27 @@ class JacksonParser( fieldConverter: ValueConverter, isRoot: Boolean = false): ArrayData = { val values = ArrayBuffer.empty[Any] + var badRecordException: Option[Throwable] = None + while (nextUntil(parser, JsonToken.END_ARRAY)) { - val v = fieldConverter.apply(parser) - if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError() - values += v + try { + val v = fieldConverter.apply(parser) + if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError() + values += v + } catch { + case PartialResultException(row, cause) => + badRecordException = badRecordException.orElse(Some(cause)) + values += row + } } - new GenericArrayData(values.toArray) + val arrayData = new GenericArrayData(values.toArray) + + if (badRecordException.isEmpty) { + arrayData + } else { + throw PartialResultException(InternalRow(arrayData), badRecordException.get) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index f3dbf44cd6c71..e3c61fe1b2569 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -853,11 +853,81 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0") checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null))) val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") - checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null)) + checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null)))) val df4 = Seq("""{"c2": [19]}""").toDF("c0") checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), Row(null)) } + test("SPARK-40646: return partial results for JSON arrays with objects") { + val st = new StructType() + .add("c1", StringType) + .add("c2", ArrayType(new StructType().add("a", LongType))) + + // "c2" is expected to be an array of structs but it is a struct in the data. + val df = Seq("""[{"c2": {"a": 1}, "c1": "abc"}]""").toDF("c0") + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(Array(Row("abc", null))) + ) + } + + test("SPARK-40646: return partial results for JSON maps") { + val st = new StructType() + .add("c1", MapType(StringType, IntegerType)) + .add("c2", StringType) + + // Map "c2" has "k2" key that is a string, not an integer. + val df = Seq("""{"c1": {"k1": 1, "k2": "A", "k3": 3}, "c2": "abc"}""").toDF("c0") + checkAnswer( + df.select(from_json($"c0", st)), + Row(Row(null, "abc")) + ) + } + + test("SPARK-40646: return partial results for JSON arrays") { + val st = new StructType() + .add("c", ArrayType(IntegerType)) + + // Values in the array are strings instead of integers. + val df = Seq("""["a", "b", "c"]""").toDF("c0") + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(null) + ) + } + + test("SPARK-40646: return partial results for nested JSON arrays") { + val st = new StructType() + .add("c", ArrayType(ArrayType(IntegerType))) + + // The second array contains a string instead of an integer. + val df = Seq("""[[1], ["2"]]""").toDF("c0") + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(null) + ) + } + + test("SPARK-40646: return partial results for objects with values as JSON arrays") { + val st = new StructType() + .add("c1", + ArrayType( + StructType( + StructField("c2", ArrayType(IntegerType)) :: + Nil + ) + ) + ) + + // Value "a" cannot be parsed as an integer, + // the error cascades to "c2", thus making its value null. + val df = Seq("""[{"c1": [{"c2": ["a"]}]}]""").toDF("c0") + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(Array(Row(null))) + ) + } + test("SPARK-33270: infers schema for JSON field with spaces and pass them to from_json") { val in = Seq("""{"a b": 1}""").toDS() val out = in.select(from_json($"value", schema_of_json("""{"a b": 100}""")) as "parsed") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index f3210b049d483..a1722d1b09c14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3381,6 +3381,27 @@ abstract class JsonSuite } } + test("SPARK-40646: parse subsequent fields if the first JSON field does not match schema") { + // In this example, the first record has "a.y" as boolean but it needs to be an object. + // We should parse "a" as null but continue parsing "b" correctly as it is valid. + withTempPath { path => + Seq( + """{"a": {"x": 1, "y": true}, "b": {"x": 1}}""", + """{"a": {"x": 2}, "b": {"x": 2}}"""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + val df = spark.read + .schema("a struct>, b struct") + .json(path.getAbsolutePath) + + checkAnswer( + df, + Seq(Row(null, Row(1)), Row(Row(2, null), Row(2))) + ) + } + } + test("SPARK-40667: validate JSON Options") { assert(JSONOptions.getAllOptions.size == 28) // Please add validation on any new Json options here