From ce5c33eb2850ca5de73b25b26261f9054acbcc94 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Tue, 4 Oct 2022 17:12:24 +1300 Subject: [PATCH 1/3] update code --- .../sql/catalyst/json/JacksonParser.scala | 24 ++++++-- .../apache/spark/sql/JsonFunctionsSuite.scala | 59 ++++++++++++++++++- .../datasources/json/JsonSuite.scala | 22 +++++++ 3 files changed, 99 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index f8adac1ee44fe..d5cdbda8569e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -456,7 +456,7 @@ class JacksonParser( schema.existenceDefaultsBitmask(index) = false } catch { case e: SparkUpgradeException => throw e - case NonFatal(e) if isRoot => + case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() } @@ -500,13 +500,27 @@ class JacksonParser( fieldConverter: ValueConverter, isRoot: Boolean = false): ArrayData = { val values = ArrayBuffer.empty[Any] + var badRecordException: Option[Throwable] = None + while (nextUntil(parser, JsonToken.END_ARRAY)) { - val v = fieldConverter.apply(parser) - if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError() - values += v + try { + val v = fieldConverter.apply(parser) + if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError() + values += v + } catch { + case PartialResultException(row, cause) => + badRecordException = badRecordException.orElse(Some(cause)) + values += row + } } - new GenericArrayData(values.toArray) + val arrayData = new GenericArrayData(values.toArray) + + if (badRecordException.isEmpty) { + arrayData + } else { + throw PartialResultException(InternalRow(arrayData), badRecordException.get) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 83bdd247ccecc..7535125527a5e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -854,11 +854,68 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0") checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null))) val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") - checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null)) + checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null)))) val df4 = Seq("""{"c2": [19]}""").toDF("c0") checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), Row(null)) } + test("SPARK-40646: return partial results for JSON arrays with objects") { + val st = new StructType() + .add("c1", StringType) + .add("c2", ArrayType(new StructType().add("a", LongType))) + + // Value of "c2.a" is a string instead of a long. + val df = Seq("""[{"c2": {"a": 1}, "c1": "abc"}]""").toDF("c0") + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(Array(Row("abc", null))) + ) + } + + test("SPARK-40646: return partial results for JSON arrays") { + val st = new StructType() + .add("c", ArrayType(IntegerType)) + + // Values in the array are strings instead of integers. + val df = Seq("""["a", "b", "c"]""").toDF("c0") + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(null) + ) + } + + test("SPARK-40646: return partial results for nested JSON arrays") { + val st = new StructType() + .add("c", ArrayType(ArrayType(IntegerType))) + + // The second array contains a string instead of an integer. + val df = Seq("""[[1], ["2"]]""").toDF("c0") + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(null) + ) + } + + test("SPARK-40646: return partial results for objects with values as JSON arrays") { + val st = new StructType() + .add("c1", + ArrayType( + StructType( + StructField("c2", ArrayType(IntegerType)) :: + Nil + ) + ) + ) + + // Value "a" cannot be parsed as an integer, + // the error cascades to "c2", thus making its value null. + val df = Seq("""[{"c1": [{"c2": ["a"]}]}]""").toDF("c0") + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(Array(Row(null))) + ) + } + test("SPARK-33270: infers schema for JSON field with spaces and pass them to from_json") { val in = Seq("""{"a b": 1}""").toDS() val out = in.select(from_json($"value", schema_of_json("""{"a b": 100}""")) as "parsed") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 212d590813720..b5322b665a51c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3381,6 +3381,28 @@ abstract class JsonSuite } } } + + test("SPARK-40646: parse subsequent fields if the first JSON field does not match schema") { + // In this example, the first record has "a.y" as boolean but it needs to be an object. + // We should parse "a" as null but continue parsing "b" correctly as it is valid. + withTempPath { path => + Seq( + """{"a": {"x": 1, "y": true}, "b": {"x": 1}}""", + """{"a": {"x": 2}, "b": {"x": 2}}"""", + ).toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + val df = spark.read + .schema("a struct>, b struct") + .json(path.getAbsolutePath) + + checkAnswer( + df, + Seq(Row(null, Row(1)), Row(Row(2, null), Row(2))) + ) + } + } } class JsonV1Suite extends JsonSuite { From d250eda2d9d3efbe17b3367aee682102a0ed068c Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Tue, 4 Oct 2022 18:38:02 +1300 Subject: [PATCH 2/3] fix scalastyle --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index b5322b665a51c..2dbae0a9fcdb7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3387,9 +3387,8 @@ abstract class JsonSuite // We should parse "a" as null but continue parsing "b" correctly as it is valid. withTempPath { path => Seq( - """{"a": {"x": 1, "y": true}, "b": {"x": 1}}""", - """{"a": {"x": 2}, "b": {"x": 2}}"""", - ).toDF() + """{"a": {"x": 1, "y": true}, "b": {"x": 1}}""", + """{"a": {"x": 2}, "b": {"x": 2}}"""").toDF() .repartition(1) .write.text(path.getAbsolutePath) From ffd336f3018324ca26812c263adafc8ed3ff848d Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Fri, 14 Oct 2022 11:56:22 +1300 Subject: [PATCH 3/3] handle maps --- .../sql/catalyst/json/JacksonParser.scala | 21 +++++++++++++++++-- .../apache/spark/sql/JsonFunctionsSuite.scala | 15 ++++++++++++- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index d5cdbda8569e6..609fe9bc903a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -482,14 +482,31 @@ class JacksonParser( fieldConverter: ValueConverter): MapData = { val keys = ArrayBuffer.empty[UTF8String] val values = ArrayBuffer.empty[Any] + var badRecordException: Option[Throwable] = None + while (nextUntil(parser, JsonToken.END_OBJECT)) { keys += UTF8String.fromString(parser.getCurrentName) - values += fieldConverter.apply(parser) + try { + values += fieldConverter.apply(parser) + } catch { + case PartialResultException(row, cause) => + badRecordException = badRecordException.orElse(Some(cause)) + values += row + case NonFatal(e) => + badRecordException = badRecordException.orElse(Some(e)) + parser.skipChildren() + } } // The JSON map will never have null or duplicated map keys, it's safe to create a // ArrayBasedMapData directly here. - ArrayBasedMapData(keys.toArray, values.toArray) + val mapData = ArrayBasedMapData(keys.toArray, values.toArray) + + if (badRecordException.isEmpty) { + mapData + } else { + throw PartialResultException(InternalRow(mapData), badRecordException.get) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index ba5714ce85537..e3c61fe1b2569 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -863,7 +863,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { .add("c1", StringType) .add("c2", ArrayType(new StructType().add("a", LongType))) - // Value of "c2.a" is a string instead of a long. + // "c2" is expected to be an array of structs but it is a struct in the data. val df = Seq("""[{"c2": {"a": 1}, "c1": "abc"}]""").toDF("c0") checkAnswer( df.select(from_json($"c0", ArrayType(st))), @@ -871,6 +871,19 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { ) } + test("SPARK-40646: return partial results for JSON maps") { + val st = new StructType() + .add("c1", MapType(StringType, IntegerType)) + .add("c2", StringType) + + // Map "c2" has "k2" key that is a string, not an integer. + val df = Seq("""{"c1": {"k1": 1, "k2": "A", "k3": 3}, "c2": "abc"}""").toDF("c0") + checkAnswer( + df.select(from_json($"c0", st)), + Row(Row(null, "abc")) + ) + } + test("SPARK-40646: return partial results for JSON arrays") { val st = new StructType() .add("c", ArrayType(IntegerType))