From c2e7078bf1f0b753ad4ad81725f7e8af3a51a77e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 26 Aug 2018 15:47:55 +0200 Subject: [PATCH 01/27] Support any type by FailureSafeParser --- .../sql/catalyst/util/FailureSafeParser.scala | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index fecfff5789a5c..1e816ce81af78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -20,43 +20,46 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, - schema: StructType, + schema: DataType, columnNameOfCorruptRecord: String, isMultiLine: Boolean) { - - private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) - private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) - private val resultRow = new GenericInternalRow(schema.length) - private val nullResult = new GenericInternalRow(schema.length) - // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { - if (corruptFieldIndex.isDefined) { - (row, badRecord) => { - var i = 0 - while (i < actualSchema.length) { - val from = actualSchema(i) - resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull - i += 1 + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = schema match { + case struct: StructType => + val corruptFieldIndex = struct.getFieldIndex(columnNameOfCorruptRecord) + val actualSchema = StructType(struct.filterNot(_.name == columnNameOfCorruptRecord)) + val resultRow = new GenericInternalRow(struct.length) + val nullResult = new GenericInternalRow(struct.length) + if (corruptFieldIndex.isDefined) { + (row, badRecord) => { + var i = 0 + while (i < actualSchema.length) { + val from = actualSchema(i) + resultRow(struct.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull + i += 1 + } + resultRow(corruptFieldIndex.get) = badRecord() + resultRow } - resultRow(corruptFieldIndex.get) = badRecord() - resultRow + } else { + (row, _) => row.getOrElse(nullResult) } - } else { - (row, _) => row.getOrElse(nullResult) - } + case _ => (row, _) => row.getOrElse(new GenericInternalRow(1)) } - private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty + private val skipParsing = !isMultiLine && mode == PermissiveMode && (schema match { + case struct: StructType => struct.isEmpty + case _ => false + }) def parse(input: IN): Iterator[InternalRow] = { try { From fe2baa4fe8f9b2fb1ee8d14d0acc7ee9bc1cc626 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 26 Aug 2018 16:56:18 +0200 Subject: [PATCH 02/27] Use FailSafeParser in from_json --- .../expressions/jsonExpressions.scala | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index b4815b47d1797..033650eea4398 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -553,19 +553,23 @@ case class JsonToStructs( // This converts parsed rows to the desired output by the given schema. @transient lazy val converter = nullableSchema match { - case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + case st: StructType => + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = - new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parsedOptions = new JSONOptions(options, timeZoneId.get) + @transient lazy val rawParser = new JacksonParser(nullableSchema, parsedOptions) + @transient lazy val createParser = CreateJacksonParser.utf8String _ + @transient lazy val parser = new FailureSafeParser[UTF8String]( + input => rawParser.parse(input, createParser, identity[UTF8String]), + parsedOptions.parseMode, + schema, + parsedOptions.columnNameOfCorruptRecord, + parsedOptions.multiLine) override def dataType: DataType = nullableSchema @@ -595,10 +599,7 @@ case class JsonToStructs( if (json.toString.trim.isEmpty) return null try { - converter(parser.parse( - json.asInstanceOf[UTF8String], - CreateJacksonParser.utf8String, - identity[UTF8String])) + converter(parser.parse(json.asInstanceOf[UTF8String])) } catch { case _: BadRecordException => null } From cecc8f558c9885b9e2e4655a1e6ba3c3a1408f8d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 26 Aug 2018 17:02:37 +0200 Subject: [PATCH 03/27] Fix tests --- .../sql/catalyst/expressions/JsonExpressionsSuite.scala | 8 ++++---- .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index f31b294fe25d4..f6d8510a5ce8b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -409,13 +409,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId), - null + InternalRow(null) ) // Other modes should still return `null`. checkEvaluation( JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId), - null + InternalRow(null) ) } @@ -457,7 +457,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with test("from_json - input=array, schema=struct, output=null") { val input = """[{"a": 1}, {"a": 2}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) - val output = null + val output = InternalRow(1) checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } @@ -487,7 +487,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( JsonToStructs(schema, Map.empty, Literal(badJson), gmtId), - null) + InternalRow(null)) } test("from_json with timestamp") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 5cbf10129a4da..4262b6789ea09 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -132,7 +132,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer( df.select(from_json($"value", schema)), - Row(null) :: Nil) + Row(Row(null)) :: Nil) } test("from_json - json doesn't conform to the array type") { From 75bdb033a76d74fae57ac05eb0be62c1d93d5166 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 26 Aug 2018 17:05:16 +0200 Subject: [PATCH 04/27] Removing unused bind variable --- .../apache/spark/sql/catalyst/expressions/jsonExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 033650eea4398..a2a56e2e13800 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -553,7 +553,7 @@ case class JsonToStructs( // This converts parsed rows to the desired output by the given schema. @transient lazy val converter = nullableSchema match { - case st: StructType => + case _: StructType => (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null From 7a8804da4d6d6cf5786038daff9a6991739d87cc Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 26 Aug 2018 17:28:23 +0200 Subject: [PATCH 05/27] Adding tests for different modes --- .../apache/spark/sql/JsonFunctionsSuite.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 4262b6789ea09..4bbdd343386b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import collection.JavaConverters._ +import org.apache.spark.SparkException import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -547,4 +548,25 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Map("pretty" -> "true"))), Seq(Row(expected))) } + + test("from_json invalid json - check modes") { + val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() + val schema = new StructType().add("a", IntegerType) + + checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null)) :: Row(Row(2)) :: Nil) + + val exceptionOne = intercept[SparkException] { + checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))), + Row(Row(2)) :: Nil) + }.getMessage + assert(exceptionOne.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + + checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))), + Row(null) :: Row(Row(2)) :: Nil) + } } From 01b63f1abee289bccb9c68eec461217acf2e8b17 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 26 Aug 2018 17:43:25 +0200 Subject: [PATCH 06/27] Improve a test --- .../test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 4bbdd343386b5..504f6b8cb3d36 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -558,9 +558,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row(Row(null)) :: Row(Row(2)) :: Nil) val exceptionOne = intercept[SparkException] { - checkAnswer( - df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))), - Row(Row(2)) :: Nil) + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() }.getMessage assert(exceptionOne.contains( "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) From b1894d2e3e01f54a8962095778e3777643cbef76 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 27 Aug 2018 00:22:30 +0200 Subject: [PATCH 07/27] Fix a sql test --- .../typeCoercion/native/stringCastAndExpressions.sql.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out b/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out index ba9bf76513f97..31ee700a8db95 100644 --- a/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out @@ -258,4 +258,4 @@ select from_json(a, 'a INT') from t -- !query 31 schema struct> -- !query 31 output -NULL +{"a":null} From b76b8d3ce51917a455f7e9c12dec656015161cec Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 28 Aug 2018 20:26:09 +0200 Subject: [PATCH 08/27] Addressing Takeshi's review comments --- .../expressions/jsonExpressions.scala | 21 +++++++++++-------- .../sql/catalyst/util/FailureSafeParser.scala | 6 +++--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index a2a56e2e13800..728533a6925a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -561,15 +561,18 @@ case class JsonToStructs( (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient lazy val parsedOptions = new JSONOptions(options, timeZoneId.get) - @transient lazy val rawParser = new JacksonParser(nullableSchema, parsedOptions) - @transient lazy val createParser = CreateJacksonParser.utf8String _ - @transient lazy val parser = new FailureSafeParser[UTF8String]( - input => rawParser.parse(input, createParser, identity[UTF8String]), - parsedOptions.parseMode, - schema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + @transient lazy val parser = { + val parsedOptions = new JSONOptions(options, timeZoneId.get) + val rawParser = new JacksonParser(nullableSchema, parsedOptions) + val createParser = CreateJacksonParser.utf8String _ + + new FailureSafeParser[UTF8String]( + input => rawParser.parse(input, createParser, identity[UTF8String]), + parsedOptions.parseMode, + schema, + parsedOptions.columnNameOfCorruptRecord, + parsedOptions.multiLine) + } override def dataType: DataType = nullableSchema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 1e816ce81af78..5f2be58513b2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -36,10 +36,9 @@ class FailureSafeParser[IN]( private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = schema match { case struct: StructType => val corruptFieldIndex = struct.getFieldIndex(columnNameOfCorruptRecord) - val actualSchema = StructType(struct.filterNot(_.name == columnNameOfCorruptRecord)) - val resultRow = new GenericInternalRow(struct.length) - val nullResult = new GenericInternalRow(struct.length) if (corruptFieldIndex.isDefined) { + val actualSchema = StructType(struct.filterNot(_.name == columnNameOfCorruptRecord)) + val resultRow = new GenericInternalRow(struct.length) (row, badRecord) => { var i = 0 while (i < actualSchema.length) { @@ -51,6 +50,7 @@ class FailureSafeParser[IN]( resultRow } } else { + val nullResult = new GenericInternalRow(struct.length) (row, _) => row.getOrElse(nullResult) } case _ => (row, _) => row.getOrElse(new GenericInternalRow(1)) From 104ee443c3842dd6695878d823a55e7b3a5cecb1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 28 Aug 2018 20:46:43 +0200 Subject: [PATCH 09/27] Restring to PERMISSIVE and FAILFAST modes so far --- .../sql/catalyst/expressions/jsonExpressions.scala | 5 ++++- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 12 +++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 728533a6925a0..a5982eebce6d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -563,12 +563,15 @@ case class JsonToStructs( @transient lazy val parser = { val parsedOptions = new JSONOptions(options, timeZoneId.get) + val mode = parsedOptions.parseMode + require(mode == PermissiveMode || mode == FailFastMode, + s"The functions supports only the ${PermissiveMode.name} and ${FailFastMode.name} modes.") val rawParser = new JacksonParser(nullableSchema, parsedOptions) val createParser = CreateJacksonParser.utf8String _ new FailureSafeParser[UTF8String]( input => rawParser.parse(input, createParser, identity[UTF8String]), - parsedOptions.parseMode, + mode, schema, parsedOptions.columnNameOfCorruptRecord, parsedOptions.multiLine) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 504f6b8cb3d36..b8b939fec0139 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -557,14 +557,16 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), Row(Row(null)) :: Row(Row(2)) :: Nil) - val exceptionOne = intercept[SparkException] { + val exception1 = intercept[SparkException] { df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() }.getMessage - assert(exceptionOne.contains( + assert(exception1.contains( "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) - checkAnswer( - df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))), - Row(null) :: Row(Row(2)) :: Nil) + val exception2 = intercept[IllegalArgumentException] { + df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))).collect() + }.getMessage + assert(exception2.contains( + "The functions supports only the PERMISSIVE and FAILFAST modes.")) } } From a87785abfb9300365b625868d9650a1032e8f4b7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 29 Aug 2018 22:34:58 +0200 Subject: [PATCH 10/27] Changing the test according to the PERMISSIVE mode --- R/pkg/tests/fulltests/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 5ad5d78d3ed17..509f689ac521e 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1694,7 +1694,7 @@ test_that("column functions", { df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) s <- collect(select(df, from_json(df$col, schema2))) - expect_equal(s[[1]][[1]], NA) + expect_equal(s[[1]][[1]]$date, NA) s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy"))) expect_is(s[[1]][[1]]$date, "Date") expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21") From c3091b3eda06fd5555b798c6d22e7efc6a99088e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 30 Aug 2018 12:22:17 +0200 Subject: [PATCH 11/27] Renaming schema -> dataType --- .../apache/spark/sql/catalyst/util/FailureSafeParser.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 5f2be58513b2a..a9fa4f04a730b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -26,14 +26,14 @@ import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, - schema: DataType, + dataType: DataType, columnNameOfCorruptRecord: String, isMultiLine: Boolean) { // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = schema match { + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = dataType match { case struct: StructType => val corruptFieldIndex = struct.getFieldIndex(columnNameOfCorruptRecord) if (corruptFieldIndex.isDefined) { @@ -56,7 +56,7 @@ class FailureSafeParser[IN]( case _ => (row, _) => row.getOrElse(new GenericInternalRow(1)) } - private val skipParsing = !isMultiLine && mode == PermissiveMode && (schema match { + private val skipParsing = !isMultiLine && mode == PermissiveMode && (dataType match { case struct: StructType => struct.isEmpty case _ => false }) From 55be20b3809f16396a8940bc507f692869e9074e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 30 Aug 2018 13:17:41 +0200 Subject: [PATCH 12/27] Throwing AnalysisException instead of IllegalArgumentException --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 6 ++++-- .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index a5982eebce6d6..accb4f6f765c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -564,8 +564,10 @@ case class JsonToStructs( @transient lazy val parser = { val parsedOptions = new JSONOptions(options, timeZoneId.get) val mode = parsedOptions.parseMode - require(mode == PermissiveMode || mode == FailFastMode, - s"The functions supports only the ${PermissiveMode.name} and ${FailFastMode.name} modes.") + if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(s"from_json() doesn't support the ${mode.name} mode. " + + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") + } val rawParser = new JacksonParser(nullableSchema, parsedOptions) val createParser = CreateJacksonParser.utf8String _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index b8b939fec0139..04e7527f4cade 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -563,10 +563,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { assert(exception1.contains( "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) - val exception2 = intercept[IllegalArgumentException] { + val exception2 = intercept[AnalysisException] { df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))).collect() }.getMessage assert(exception2.contains( - "The functions supports only the PERMISSIVE and FAILFAST modes.")) + "from_json() doesn't support the DROPMALFORMED mode. " + + "Acceptable modes are PERMISSIVE and FAILFAST.")) } } From ce49b242b97e603acac20cd4ef6e840bede46e12 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 31 Aug 2018 11:44:40 +0200 Subject: [PATCH 13/27] Check that the AnalysisException is thrown during producing a plan --- .../test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 04e7527f4cade..9f17c2d7d34ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -564,7 +564,8 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) val exception2 = intercept[AnalysisException] { - df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))).collect() + df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))) + .queryExecution.executedPlan }.getMessage assert(exception2.contains( "from_json() doesn't support the DROPMALFORMED mode. " + From 57eb59f3f47b4ffc205daa257bc05347cc763fb4 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 10 Sep 2018 21:47:00 +0200 Subject: [PATCH 14/27] Replacing AnalysisException by IllegalArgumentException --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 2 +- .../test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index accb4f6f765c7..09b03bf8551a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -565,7 +565,7 @@ case class JsonToStructs( val parsedOptions = new JSONOptions(options, timeZoneId.get) val mode = parsedOptions.parseMode if (mode != PermissiveMode && mode != FailFastMode) { - throw new AnalysisException(s"from_json() doesn't support the ${mode.name} mode. " + + throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") } val rawParser = new JacksonParser(nullableSchema, parsedOptions) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 9f17c2d7d34ee..f16c4b2d1d02a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -563,9 +563,9 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { assert(exception1.contains( "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) - val exception2 = intercept[AnalysisException] { + val exception2 = intercept[SparkException] { df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))) - .queryExecution.executedPlan + .collect() }.getMessage assert(exception2.contains( "from_json() doesn't support the DROPMALFORMED mode. " + From 20b75221fedebf4dc745b54a56130a4f363fbf1c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 18 Sep 2018 22:02:01 +0200 Subject: [PATCH 15/27] Fix a test --- .../expressions/JsonExpressionsSuite.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index f6d8510a5ce8b..6a804fc468cc8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Calendar +import org.scalatest.exceptions.TestFailedException + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.TreeNodeException @@ -412,11 +414,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with InternalRow(null) ) - // Other modes should still return `null`. - checkEvaluation( - JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId), - InternalRow(null) - ) + val msg = intercept[TestFailedException] { + checkEvaluation( + JsonToStructs(schema, Map("mode" -> FailFastMode.name), Literal(jsonData), gmtId), + InternalRow(null) + ) + }.getCause.getMessage + assert(msg.contains("Malformed records are detected in record parsing. Parse Mode: FAILFAST")) } test("from_json - input=array, schema=array, output=array") { From 63b8b6654044ce0fc2dfb210d8634120d0e82890 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 18 Sep 2018 22:03:40 +0200 Subject: [PATCH 16/27] Improving the test --- .../catalyst/expressions/JsonExpressionsSuite.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 6a804fc468cc8..0767648c1de03 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -21,7 +21,7 @@ import java.util.Calendar import org.scalatest.exceptions.TestFailedException -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.plans.PlanTestBase @@ -414,13 +414,15 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with InternalRow(null) ) - val msg = intercept[TestFailedException] { + val exception = intercept[TestFailedException] { checkEvaluation( JsonToStructs(schema, Map("mode" -> FailFastMode.name), Literal(jsonData), gmtId), InternalRow(null) ) - }.getCause.getMessage - assert(msg.contains("Malformed records are detected in record parsing. Parse Mode: FAILFAST")) + }.getCause + assert(exception.isInstanceOf[SparkException]) + assert(exception.getMessage.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST")) } test("from_json - input=array, schema=array, output=array") { From a5489f53f764cf5f7e270041d3c5ea1b1fffaa7c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 18 Sep 2018 22:13:03 +0200 Subject: [PATCH 17/27] Removing the BadRecordException handler --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 09b03bf8551a3..723ab087fa46e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -606,11 +606,7 @@ case class JsonToStructs( // deal with this but produces `Nil`. if (json.toString.trim.isEmpty) return null - try { - converter(parser.parse(json.asInstanceOf[UTF8String])) - } catch { - case _: BadRecordException => null - } + converter(parser.parse(json.asInstanceOf[UTF8String])) } override def inputTypes: Seq[AbstractDataType] = StringType :: Nil From 99049038aed277c647c6e7888019b72e9708a398 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 18 Sep 2018 22:36:29 +0200 Subject: [PATCH 18/27] Updating test's title --- .../spark/sql/catalyst/expressions/JsonExpressionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 0767648c1de03..ba3b9ff887f7c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -460,7 +460,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } - test("from_json - input=array, schema=struct, output=null") { + test("from_json - input=array, schema=struct, output=single row") { val input = """[{"a": 1}, {"a": 2}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) val output = InternalRow(1) From bda3a4e9462e988d771d180926471561c6da4886 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 5 Oct 2018 23:53:14 +0200 Subject: [PATCH 19/27] Addressing Hyukjin's review comments --- .../sql/catalyst/util/FailureSafeParser.scala | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index a9fa4f04a730b..0c8caa522f28d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -33,26 +33,32 @@ class FailureSafeParser[IN]( // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = dataType match { - case struct: StructType => - val corruptFieldIndex = struct.getFieldIndex(columnNameOfCorruptRecord) - if (corruptFieldIndex.isDefined) { - val actualSchema = StructType(struct.filterNot(_.name == columnNameOfCorruptRecord)) - val resultRow = new GenericInternalRow(struct.length) - (row, badRecord) => { - var i = 0 - while (i < actualSchema.length) { - val from = actualSchema(i) - resultRow(struct.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull - i += 1 - } - resultRow(corruptFieldIndex.get) = badRecord() - resultRow + private def structToResultRow( + struct: StructType) + : (Option[InternalRow], () => UTF8String) => InternalRow = { + val corruptFieldIndex = struct.getFieldIndex(columnNameOfCorruptRecord) + + if (corruptFieldIndex.isDefined) { + val actualSchema = StructType(struct.filterNot(_.name == columnNameOfCorruptRecord)) + val resultRow = new GenericInternalRow(struct.length) + (row, badRecord) => { + var i = 0 + while (i < actualSchema.length) { + val from = actualSchema(i) + resultRow(struct.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull + i += 1 } - } else { - val nullResult = new GenericInternalRow(struct.length) - (row, _) => row.getOrElse(nullResult) + resultRow(corruptFieldIndex.get) = badRecord() + resultRow } + } else { + val nullResult = new GenericInternalRow(struct.length) + (row, _) => row.getOrElse(nullResult) + } + } + + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = dataType match { + case struct: StructType => structToResultRow(struct) case _ => (row, _) => row.getOrElse(new GenericInternalRow(1)) } From fa20fd2689f06c31dba02cc77798fb81352b0fea Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 9 Oct 2018 14:16:48 +0200 Subject: [PATCH 20/27] Produce null and put input to the corrupted column if an input array has more than 1 element for struct schema --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 2 +- .../apache/spark/sql/catalyst/json/JacksonParser.scala | 7 ++++++- .../sql/catalyst/expressions/JsonExpressionsSuite.scala | 9 ++++++--- .../scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../sql/execution/datasources/json/JsonFileFormat.scala | 2 +- .../spark/sql/execution/datasources/json/JsonSuite.scala | 2 +- 6 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 723ab087fa46e..51d3615c6ac08 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -568,7 +568,7 @@ case class JsonToStructs( throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") } - val rawParser = new JacksonParser(nullableSchema, parsedOptions) + val rawParser = new JacksonParser(nullableSchema, parsedOptions, explodeArray = false) val createParser = CreateJacksonParser.utf8String _ new FailureSafeParser[UTF8String]( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 918c9e71ad37a..2a74002c55308 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -38,7 +38,8 @@ import org.apache.spark.util.Utils */ class JacksonParser( schema: DataType, - val options: JSONOptions) extends Logging { + val options: JSONOptions, + explodeArray: Boolean) extends Logging { import JacksonUtils._ import com.fasterxml.jackson.core.JsonToken._ @@ -90,6 +91,10 @@ class JacksonParser( // in such an array as a row, this case is possible. if (array.numElements() == 0) { Nil + } else if (array.numElements() > 1 && !explodeArray) { + throw new RuntimeException("Found an array with more than one element for " + + s"the specified schema ${st.catalogString}. " + + s"The array cannot be converted to the type.") } else { array.toArray[InternalRow](schema).toSeq } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index ba3b9ff887f7c..0c9ef82425b34 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -462,9 +462,12 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with test("from_json - input=array, schema=struct, output=single row") { val input = """[{"a": 1}, {"a": 2}]""" - val schema = StructType(StructField("a", IntegerType) :: Nil) - val output = InternalRow(1) - checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) + val corrupted = "corrupted" + val schema = new StructType().add("a", IntegerType).add(corrupted, StringType) + StructType(StructField("a", IntegerType) :: Nil) + val output = InternalRow(null, UTF8String.fromString(input)) + val options = Map("columnNameOfCorruptRecord" -> corrupted) + checkEvaluation(JsonToStructs(schema, options, Literal(input), gmtId), output) } test("from_json - input=empty array, schema=struct, output=null") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 4f6d8b8a0c34a..4d7d6ef76abdb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -446,7 +446,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val createParser = CreateJacksonParser.string _ val parsed = jsonDataset.rdd.mapPartitions { iter => - val rawParser = new JacksonParser(actualSchema, parsedOptions) + val rawParser = new JacksonParser(actualSchema, parsedOptions, explodeArray = true) val parser = new FailureSafeParser[String]( input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index a9241afba537b..14f32036219df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -130,7 +130,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { } (file: PartitionedFile) => { - val parser = new JacksonParser(actualSchema, parsedOptions) + val parser = new JacksonParser(actualSchema, parsedOptions, explodeArray = true) JsonDataSource(parsedOptions).readFile( broadcastedHadoopConf.value.value, file, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 43e1a616e363c..ea4be27239cb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -67,7 +67,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dummyOption = new JSONOptions(Map.empty[String, String], "GMT") val dummySchema = StructType(Seq.empty) - val parser = new JacksonParser(dummySchema, dummyOption) + val parser = new JacksonParser(dummySchema, dummyOption, explodeArray = true) Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser => jsonParser.nextToken() From 2663696f994632fc3adf86501b63fd78797b58f5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 9 Oct 2018 21:34:31 +0200 Subject: [PATCH 21/27] Removing special handling of empty strings --- .../expressions/jsonExpressions.scala | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 51d3615c6ac08..c01624ebbabde 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -585,27 +585,6 @@ case class JsonToStructs( copy(timeZoneId = Option(timeZoneId)) override def nullSafeEval(json: Any): Any = { - // When input is, - // - `null`: `null`. - // - invalid json: `null`. - // - empty string: `null`. - // - // When the schema is array, - // - json array: `Array(Row(...), ...)` - // - json object: `Array(Row(...))` - // - empty json array: `Array()`. - // - empty json object: `Array(Row(null))`. - // - // When the schema is a struct, - // - json object/array with single element: `Row(...)` - // - json array with multiple elements: `null` - // - empty json array: `null`. - // - empty json object: `Row(null)`. - - // We need `null` if the input string is an empty string. `JacksonParser` can - // deal with this but produces `Nil`. - if (json.toString.trim.isEmpty) return null - converter(parser.parse(json.asInstanceOf[UTF8String])) } From b84b343dc3ee72ced7f422e417cb575af1d872c7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 9 Oct 2018 21:51:25 +0200 Subject: [PATCH 22/27] Don't allow parsing arrays as structs by from_json --- .../sql/catalyst/expressions/jsonExpressions.scala | 2 +- .../apache/spark/sql/catalyst/json/JacksonParser.scala | 10 ++++------ .../catalyst/expressions/JsonExpressionsSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../execution/datasources/json/JsonFileFormat.scala | 2 +- .../sql/execution/datasources/json/JsonSuite.scala | 2 +- 6 files changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index c01624ebbabde..1587dd6d62f25 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -568,7 +568,7 @@ case class JsonToStructs( throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") } - val rawParser = new JacksonParser(nullableSchema, parsedOptions, explodeArray = false) + val rawParser = new JacksonParser(nullableSchema, parsedOptions, allowArrayAsStructs = false) val createParser = CreateJacksonParser.utf8String _ new FailureSafeParser[UTF8String]( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 2a74002c55308..57c7f2faf3107 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -39,7 +39,7 @@ import org.apache.spark.util.Utils class JacksonParser( schema: DataType, val options: JSONOptions, - explodeArray: Boolean) extends Logging { + allowArrayAsStructs: Boolean) extends Logging { import JacksonUtils._ import com.fasterxml.jackson.core.JsonToken._ @@ -85,19 +85,17 @@ class JacksonParser( // List([str_a_1,null]) // List([str_a_2,null], [null,str_b_3]) // - case START_ARRAY => + case START_ARRAY if allowArrayAsStructs => val array = convertArray(parser, elementConverter) // Here, as we support reading top level JSON arrays and take every element // in such an array as a row, this case is possible. if (array.numElements() == 0) { Nil - } else if (array.numElements() > 1 && !explodeArray) { - throw new RuntimeException("Found an array with more than one element for " + - s"the specified schema ${st.catalogString}. " + - s"The array cannot be converted to the type.") } else { array.toArray[InternalRow](schema).toSeq } + case START_ARRAY => + throw new RuntimeException("Parsing JSON arrays as structs is forbidden.") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 0c9ef82425b34..f16716a5e9988 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -456,7 +456,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with test("from_json - input=array of single object, schema=struct, output=single row") { val input = """[{"a": 1}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) - val output = InternalRow(1) + val output = InternalRow(null) checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } @@ -473,7 +473,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with test("from_json - input=empty array, schema=struct, output=null") { val input = """[]""" val schema = StructType(StructField("a", IntegerType) :: Nil) - val output = null + val output = InternalRow(null) checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 4d7d6ef76abdb..95c97e5c9433c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -446,7 +446,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val createParser = CreateJacksonParser.string _ val parsed = jsonDataset.rdd.mapPartitions { iter => - val rawParser = new JacksonParser(actualSchema, parsedOptions, explodeArray = true) + val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true) val parser = new FailureSafeParser[String]( input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 14f32036219df..1f7c9d73f19fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -130,7 +130,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { } (file: PartitionedFile) => { - val parser = new JacksonParser(actualSchema, parsedOptions, explodeArray = true) + val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true) JsonDataSource(parsedOptions).readFile( broadcastedHadoopConf.value.value, file, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index ea4be27239cb8..06032ded42a53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -67,7 +67,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dummyOption = new JSONOptions(Map.empty[String, String], "GMT") val dummySchema = StructType(Seq.empty) - val parser = new JacksonParser(dummySchema, dummyOption, explodeArray = true) + val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true) Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser => jsonParser.nextToken() From e22b974152c689442f458df755a06c98f4475f84 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 10 Oct 2018 13:39:34 +0200 Subject: [PATCH 23/27] Modified a test to check the spark.sql.columnNameOfCorruptRecord config --- .../expressions/jsonExpressions.scala | 3 +- .../apache/spark/sql/JsonFunctionsSuite.scala | 47 +++++++++++-------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 1587dd6d62f25..1c1da274ed320 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -561,8 +561,9 @@ case class JsonToStructs( (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } + val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD) @transient lazy val parser = { - val parsedOptions = new JSONOptions(options, timeZoneId.get) + val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord) val mode = parsedOptions.parseMode if (mode != PermissiveMode && mode != FailFastMode) { throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index f16c4b2d1d02a..797b274f42cdd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -21,6 +21,7 @@ import collection.JavaConverters._ import org.apache.spark.SparkException import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -550,25 +551,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { } test("from_json invalid json - check modes") { - val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() - val schema = new StructType().add("a", IntegerType) - - checkAnswer( - df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), - Row(Row(null)) :: Row(Row(2)) :: Nil) - - val exception1 = intercept[SparkException] { - df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() - }.getMessage - assert(exception1.contains( - "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) - - val exception2 = intercept[SparkException] { - df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))) - .collect() - }.getMessage - assert(exception2.contains( - "from_json() doesn't support the DROPMALFORMED mode. " + - "Acceptable modes are PERMISSIVE and FAILFAST.")) + withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { + val schema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("_unparsed", StringType) + val badRec = """{"a" 1, "b": 11}""" + val df = Seq(badRec, """{"a": 2, "b": 12}""").toDS() + + checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null, null, badRec)) :: Row(Row(2, 12, null)) :: Nil) + + val exception1 = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() + }.getMessage + assert(exception1.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + + val exception2 = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))) + .collect() + }.getMessage + assert(exception2.contains( + "from_json() doesn't support the DROPMALFORMED mode. " + + "Acceptable modes are PERMISSIVE and FAILFAST.")) + } } } From 4157141d7e978cc912b58ed956dbc9c5e7c4ce97 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 10 Oct 2018 13:42:19 +0200 Subject: [PATCH 24/27] Improving tests --- .../spark/sql/catalyst/expressions/JsonExpressionsSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index f16716a5e9988..304642161146b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -464,13 +464,12 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with val input = """[{"a": 1}, {"a": 2}]""" val corrupted = "corrupted" val schema = new StructType().add("a", IntegerType).add(corrupted, StringType) - StructType(StructField("a", IntegerType) :: Nil) val output = InternalRow(null, UTF8String.fromString(input)) val options = Map("columnNameOfCorruptRecord" -> corrupted) checkEvaluation(JsonToStructs(schema, options, Literal(input), gmtId), output) } - test("from_json - input=empty array, schema=struct, output=null") { + test("from_json - input=empty array, schema=struct, output=single row with null") { val input = """[]""" val schema = StructType(StructField("a", IntegerType) :: Nil) val output = InternalRow(null) From 54be09c3e7f112ec414c3b1352787fd33f734198 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 11 Oct 2018 09:59:32 +0200 Subject: [PATCH 25/27] Fix a python test --- python/pyspark/sql/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 32d7f02f61883..2694e777d8266 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2305,7 +2305,7 @@ def from_json(col, schema, options={}): [Row(json=[Row(a=1)])] >>> schema = schema_of_json(lit('''{"a": 0}''')) >>> df.select(from_json(df.value, schema).alias("json")).collect() - [Row(json=Row(a=1))] + [Row(json=Row(a=None))] >>> data = [(1, '''[1, 2, 3]''')] >>> schema = ArrayType(IntegerType()) >>> df = spark.createDataFrame(data, ("key", "value")) From 3f04f7f8206f95362fa8640c1c4aec41cc2d549b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 24 Oct 2018 11:19:00 +0800 Subject: [PATCH 26/27] add migration guide --- docs/sql-migration-guide-upgrade.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index b8b9ad8438554..dfa35b88369cb 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -13,6 +13,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, the parser of JSON data source treats empty strings as null for some data types such as `IntegerType`. For `FloatType` and `DoubleType`, it fails on empty strings and throws exceptions. Since Spark 3.0, we disallow empty strings and will throw exceptions for data types except for `StringType` and `BinaryType`. + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. From b2988c76456627e245bd8e157c76197fe4cc0ade Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 24 Oct 2018 11:41:39 +0800 Subject: [PATCH 27/27] address comments --- .../expressions/jsonExpressions.scala | 7 ++++- .../sql/catalyst/util/FailureSafeParser.scala | 31 +++++++------------ 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 1c1da274ed320..e966924293cf7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -572,10 +572,15 @@ case class JsonToStructs( val rawParser = new JacksonParser(nullableSchema, parsedOptions, allowArrayAsStructs = false) val createParser = CreateJacksonParser.utf8String _ + val parserSchema = nullableSchema match { + case s: StructType => s + case other => StructType(StructField("value", other) :: Nil) + } + new FailureSafeParser[UTF8String]( input => rawParser.parse(input, createParser, identity[UTF8String]), mode, - schema, + parserSchema, parsedOptions.columnNameOfCorruptRecord, parsedOptions.multiLine) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 0c8caa522f28d..fecfff5789a5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -20,52 +20,43 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, - dataType: DataType, + schema: StructType, columnNameOfCorruptRecord: String, isMultiLine: Boolean) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + private val nullResult = new GenericInternalRow(schema.length) + // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private def structToResultRow( - struct: StructType) - : (Option[InternalRow], () => UTF8String) => InternalRow = { - val corruptFieldIndex = struct.getFieldIndex(columnNameOfCorruptRecord) - + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { if (corruptFieldIndex.isDefined) { - val actualSchema = StructType(struct.filterNot(_.name == columnNameOfCorruptRecord)) - val resultRow = new GenericInternalRow(struct.length) (row, badRecord) => { var i = 0 while (i < actualSchema.length) { val from = actualSchema(i) - resultRow(struct.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull + resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull i += 1 } resultRow(corruptFieldIndex.get) = badRecord() resultRow } } else { - val nullResult = new GenericInternalRow(struct.length) (row, _) => row.getOrElse(nullResult) } } - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = dataType match { - case struct: StructType => structToResultRow(struct) - case _ => (row, _) => row.getOrElse(new GenericInternalRow(1)) - } - - private val skipParsing = !isMultiLine && mode == PermissiveMode && (dataType match { - case struct: StructType => struct.isEmpty - case _ => false - }) + private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try {