From a0a7091e58d84d1927b7e17511414bf952c73cf5 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 25 Feb 2017 21:10:45 +0900 Subject: [PATCH 1/8] Support ArrayType for json array --- python/pyspark/sql/functions.py | 13 +++- .../expressions/jsonExpressions.scala | 62 +++++++++++++-- .../sql/catalyst/json/JacksonParser.scala | 11 ++- .../expressions/JsonExpressionsSuite.scala | 38 ++++++++- .../org/apache/spark/sql/functions.scala | 78 +++++++++++++++---- .../apache/spark/sql/JsonFunctionsSuite.scala | 31 +++++++- 6 files changed, 202 insertions(+), 31 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 426a4a8c93a6..53c60b460ca4 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1773,11 +1773,11 @@ def json_tuple(col, *fields): @since(2.1) def from_json(col, schema, options={}): """ - Parses a column containing a JSON string into a [[StructType]] with the - specified schema. Returns `null`, in the case of an unparseable string. + Parses a column containing a JSON object or array string into a [[StructType]] or [[ArrayType]] + with the specified schema. Returns `null`, in the case of an unparseable string. - :param col: string column in json format - :param schema: a StructType to use when parsing the json column + :param col: string column in json object or array format + :param schema: a StructType or ArrayType to use when parsing the json column :param options: options to control parsing. accepts the same options as the json datasource >>> from pyspark.sql.types import * @@ -1786,6 +1786,11 @@ def from_json(col, schema, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(from_json(df.value, schema).alias("json")).collect() [Row(json=Row(a=1))] + >>> data = [(1, '''[{"a": 1}]''')] + >>> schema = ArrayType(StructType([StructField("a", IntegerType())])) + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(from_json(df.value, schema).alias("json")).collect() + [Row(json=[Row(a=1)])] """ sc = SparkContext._active_spark_context diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 1e690a446951..8f154f96e075 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.util.ParseModes +import org.apache.spark.sql.catalyst.util.{GenericArrayData, ParseModes} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -480,24 +480,54 @@ case class JsonTuple(children: Seq[Expression]) } /** - * Converts an json input string to a [[StructType]] with the specified schema. + * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema. */ case class JsonToStruct( - schema: StructType, + schema: DataType, options: Map[String, String], child: Expression, timeZoneId: Option[String] = None) extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { override def nullable: Boolean = true - def this(schema: StructType, options: Map[String, String], child: Expression) = + def this(schema: DataType, options: Map[String, String], child: Expression) = this(schema, options, child, None) + override def checkInputDataTypes(): TypeCheckResult = { + super.checkInputDataTypes() + schema match { + case st: StructType => TypeCheckResult.TypeCheckSuccess + case ArrayType(st: StructType, _) => TypeCheckResult.TypeCheckSuccess + case _ => TypeCheckResult.TypeCheckFailure( + s"Input schema ${schema.simpleString} must be a struct or an array of struct.") + } + } + + @transient + lazy val rowSchema = schema match { + case st: StructType => st + case ArrayType(st: StructType, _) => st + } + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = schema match { + case _: StructType => + // These are always produced from json objects by `objectSupport` in `JacksonParser`. + (rows: Seq[InternalRow]) => rows.head + + case ArrayType(_: StructType, _) => + // These are always produced from json arrays by `arraySupport` in `JacksonParser`. + (rows: Seq[InternalRow]) => new GenericArrayData(rows) + } + @transient lazy val parser = new JacksonParser( - schema, - new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get)) + rowSchema, + new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get), + objectSupport = schema.isInstanceOf[StructType], + arraySupport = schema.isInstanceOf[ArrayType]) override def dataType: DataType = schema @@ -505,11 +535,27 @@ case class JsonToStruct( copy(timeZoneId = Option(timeZoneId)) override def nullSafeEval(json: Any): Any = { + // `null` related behavior of this expression is as below: + // When input is, + // - `null`: the output is `null`. + // - invalid json: the output is `null`. + // - empty string: the output is `null`. + // - empty json object: the output is Row(`null`). + // - empty json array: the output is `Nil`. + // + // Note that, it returns `null` if the schema is not matched. If the schema is + // `StructType`, then the json string should be json objects. If the schema is + // array of structs, then the string should be json arrays. + + // We need `null` if the input string is an empty string. `JacksonParser` can + // deal with this but produces `Nil`. + if (json.toString.trim.isEmpty) return null + try { - parser.parse( + converter(parser.parse( json.asInstanceOf[UTF8String], CreateJacksonParser.utf8String, - identity[UTF8String]).headOption.orNull + identity[UTF8String])) } catch { case _: SparkSQLJsonProcessingException => null } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 9b80c0fc87c9..28de182446fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -39,7 +39,12 @@ private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeE */ class JacksonParser( schema: StructType, - options: JSONOptions) extends Logging { + options: JSONOptions, + arraySupport: Boolean = true, + objectSupport: Boolean = true) extends Logging { + + // One of both should be true. Otherwise, this parser can't parse any json. + require(arraySupport || objectSupport) import JacksonUtils._ import ParseModes._ @@ -166,7 +171,7 @@ class JacksonParser( val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter).toArray (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, st) { - case START_OBJECT => convertObject(parser, st, fieldConverters) :: Nil + case START_OBJECT if objectSupport => convertObject(parser, st, fieldConverters) :: Nil // SPARK-3308: support reading top level JSON arrays and take every element // in such an array as a row // @@ -180,7 +185,7 @@ class JacksonParser( // List([str_a_1,null]) // List([str_a_2,null], [null,str_b_3]) // - case START_ARRAY => + case START_ARRAY if arraySupport => val array = convertArray(parser, elementConverter) // Here, as we support reading top level JSON arrays and take every element // in such an array as a row, this case is possible. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 0c46819cdb9c..c9865d03ddc6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -22,7 +22,7 @@ import java.util.Calendar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes} -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -372,6 +372,42 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { ) } + test("from_json - array") { + val jsonDataArr = """[{"a": 1}, {"a": 2}]""" + val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val expected = + InternalRow.fromSeq(1 :: Nil) :: + InternalRow.fromSeq(2 :: Nil) :: Nil + checkEvaluation(JsonToStruct( + schema, Map.empty, Literal(jsonDataArr), gmtId), expected) + + // Empty array produces empty collection. + checkEvaluation(JsonToStruct(schema, Map.empty, Literal("[ ]"), gmtId), Nil) + } + + test("from_json empty object") { + val jsonData = """{ }""" + val schema = StructType(StructField("a", IntegerType) :: Nil) + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), InternalRow(null)) + } + + test("from_json schema mismatch - struct") { + val jsonData = """[{"a" 1}]""" + val schema = new StructType().add("a", IntegerType) + + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), null) + } + + test("from_json schema mismatch - array") { + val jsonData = """{"a" 1}""" + val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), null) + } + test("from_json null input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 24ed906d3368..f6aea0c388b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2953,11 +2953,11 @@ object functions { } /** - * (Scala-specific) Parses a column containing a JSON string into a `StructType` with the + * (Scala-specific) Parses a column containing a JSON object string into a `StructType` with the * specified schema. Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON data. - * @param schema the schema to use when parsing the json string + * @param e a string column containing JSON object data. + * @param schema the schema to use when parsing the json object string * @param options options to control how the json is parsed. accepts the same options and the * json data source. * @@ -2969,11 +2969,27 @@ object functions { } /** - * (Java-specific) Parses a column containing a JSON string into a `StructType` with the + * (Scala-specific) Parses a column containing a JSON array string into a `ArrayType` with the * specified schema. Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON data. - * @param schema the schema to use when parsing the json string + * @param e a string column containing JSON array data. + * @param schema the schema to use when parsing the json array string + * @param options options to control how the json is parsed. accepts the same options and the + * json data source. + * + * @group collection_funcs + * @since 2.2.0 + */ + def from_json(e: Column, schema: ArrayType, options: Map[String, String]): Column = withExpr { + JsonToStruct(schema, options, e.expr) + } + + /** + * (Java-specific) Parses a column containing a JSON object string into a `StructType` with the + * specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing JSON object data. + * @param schema the schema to use when parsing the json object string * @param options options to control how the json is parsed. accepts the same options and the * json data source. * @@ -2984,11 +3000,26 @@ object functions { from_json(e, schema, options.asScala.toMap) /** - * Parses a column containing a JSON string into a `StructType` with the specified schema. + * (Java-specific) Parses a column containing a JSON array string into a `ArrayType` with the + * specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing JSON array data. + * @param schema the schema to use when parsing the json array string + * @param options options to control how the json is parsed. accepts the same options and the + * json data source. + * + * @group collection_funcs + * @since 2.2.0 + */ + def from_json(e: Column, schema: ArrayType, options: java.util.Map[String, String]): Column = + from_json(e, schema, options.asScala.toMap) + + /** + * Parses a column containing a JSON object string into a `StructType` with the specified schema. * Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON data. - * @param schema the schema to use when parsing the json string + * @param e a string column containing JSON object data. + * @param schema the schema to use when parsing the json object string * * @group collection_funcs * @since 2.1.0 @@ -2997,18 +3028,37 @@ object functions { from_json(e, schema, Map.empty[String, String]) /** - * Parses a column containing a JSON string into a `StructType` with the specified schema. + * Parses a column containing a JSON array string into a `ArrayType` with the specified schema. * Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON data. + * @param e a string column containing JSON array data. + * @param schema the schema to use when parsing the json array string + * + * @group collection_funcs + * @since 2.2.0 + */ + def from_json(e: Column, schema: ArrayType): Column = + from_json(e, schema, Map.empty[String, String]) + + /** + * Parses a column containing a JSON object or array string into a `StructType` or `ArrayType` + * with the specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing JSON object or array data. * @param schema the schema to use when parsing the json string as a json string * * @group collection_funcs * @since 2.1.0 */ - def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = - from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options) - + def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = { + DataType.fromJson(schema) match { + case st: StructType => from_json(e, st, options) + case at @ ArrayType(_: StructType, _) => from_json(e, at, options) + case dt => + throw new IllegalArgumentException( + s"Input schema ${dt.simpleString} must be a struct or an array of struct.") + } + } /** * (Scala-specific) Converts a column containing a `StructType` into a JSON string with the diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 9c39b3c7f09b..a5364c06ad30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ + import org.apache.spark.sql.functions.{from_json, struct, to_json} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{CalendarIntervalType, IntegerType, StructType, TimestampType} +import org.apache.spark.sql.types._ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -133,6 +135,17 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row(null) :: Nil) } + test("from_json invalid schema") { + val df = Seq("""{"a" 1}""").toDS() + val schema = ArrayType(StringType) + val message = intercept[IllegalArgumentException] { + df.select(from_json($"value", schema.json, Map.empty[String, String].asJava)) + }.getMessage + + assert(message.contains( + "Input schema array must be a struct or an array of struct.")) + } + test("to_json") { val df = Seq(Tuple1(Tuple1(1))).toDF("a") @@ -141,6 +154,22 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row("""{"_1":1}""") :: Nil) } + test("from_json array support") { + val df = Seq("""[{"a": 1, "b": "a"}, {"a": 2}, { }]""").toDS() + val schema = ArrayType( + StructType( + StructField("a", IntegerType) :: + StructField("b", StringType) :: Nil)) + + checkAnswer( + df.select(from_json($"value", schema)), + Row(Seq(Row(1, "a"), Row(2, null), Row(null, null)))) + + checkAnswer( + df.select(from_json($"value", schema.json, Map.empty[String, String].asJava)), + Row(Seq(Row(1, "a"), Row(2, null), Row(null, null)))) + } + test("to_json with option") { val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm") From ef007f9afc8fdbf77aa7f9281262c8d27e62dc31 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 26 Feb 2017 13:25:48 +0900 Subject: [PATCH 2/8] Minor cleanups --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 6 +++--- .../src/main/scala/org/apache/spark/sql/functions.scala | 2 +- .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 8f154f96e075..2a4656c456d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -496,10 +496,10 @@ case class JsonToStruct( override def checkInputDataTypes(): TypeCheckResult = { super.checkInputDataTypes() schema match { - case st: StructType => TypeCheckResult.TypeCheckSuccess - case ArrayType(st: StructType, _) => TypeCheckResult.TypeCheckSuccess + case _: StructType => TypeCheckResult.TypeCheckSuccess + case ArrayType(_: StructType, _) => TypeCheckResult.TypeCheckSuccess case _ => TypeCheckResult.TypeCheckFailure( - s"Input schema ${schema.simpleString} must be a struct or an array of struct.") + s"Input schema ${schema.simpleString} must be a struct or an array of structs.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index f6aea0c388b3..0436ec2ff717 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3056,7 +3056,7 @@ object functions { case at @ ArrayType(_: StructType, _) => from_json(e, at, options) case dt => throw new IllegalArgumentException( - s"Input schema ${dt.simpleString} must be a struct or an array of struct.") + s"Input schema ${dt.simpleString} must be a struct or an array of structs.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index a5364c06ad30..e1fd64d3636e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -143,7 +143,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { }.getMessage assert(message.contains( - "Input schema array must be a struct or an array of struct.")) + "Input schema array must be a struct or an array of structs.")) } test("to_json") { From 470d87969d8fa2de6adfd3765086e03ec8f12252 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 26 Feb 2017 21:19:08 +0900 Subject: [PATCH 3/8] Cleaner --- .../sql/catalyst/expressions/jsonExpressions.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 2a4656c456d4..e12e9f153f0c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -493,14 +493,11 @@ case class JsonToStruct( def this(schema: DataType, options: Map[String, String], child: Expression) = this(schema, options, child, None) - override def checkInputDataTypes(): TypeCheckResult = { - super.checkInputDataTypes() - schema match { - case _: StructType => TypeCheckResult.TypeCheckSuccess - case ArrayType(_: StructType, _) => TypeCheckResult.TypeCheckSuccess - case _ => TypeCheckResult.TypeCheckFailure( - s"Input schema ${schema.simpleString} must be a struct or an array of structs.") - } + override def checkInputDataTypes(): TypeCheckResult = schema match { + case _: StructType | ArrayType(_: StructType, _) => + super.checkInputDataTypes() + case _ => TypeCheckResult.TypeCheckFailure( + s"Input schema ${schema.simpleString} must be a struct or an array of structs.") } @transient From 72d641018635aae94cc89e216e30540233d461f4 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 28 Feb 2017 15:16:54 +0900 Subject: [PATCH 4/8] Permissively support array in from_json and make a one for DataType --- python/pyspark/sql/functions.py | 4 +- .../expressions/jsonExpressions.scala | 32 ++++---- .../sql/catalyst/json/JacksonParser.scala | 12 +-- .../expressions/JsonExpressionsSuite.scala | 52 ++++++++----- .../org/apache/spark/sql/functions.scala | 75 +++++++++---------- .../apache/spark/sql/JsonFunctionsSuite.scala | 8 +- 6 files changed, 92 insertions(+), 91 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 53c60b460ca4..376b86ea69bd 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1773,10 +1773,10 @@ def json_tuple(col, *fields): @since(2.1) def from_json(col, schema, options={}): """ - Parses a column containing a JSON object or array string into a [[StructType]] or [[ArrayType]] + Parses a column containing a JSON string into a [[StructType]] or [[ArrayType]] with the specified schema. Returns `null`, in the case of an unparseable string. - :param col: string column in json object or array format + :param col: string column in json format :param schema: a StructType or ArrayType to use when parsing the json column :param options: options to control parsing. accepts the same options as the json datasource diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index e12e9f153f0c..dbff62efdddb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -510,11 +510,8 @@ case class JsonToStruct( @transient lazy val converter = schema match { case _: StructType => - // These are always produced from json objects by `objectSupport` in `JacksonParser`. - (rows: Seq[InternalRow]) => rows.head - + (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null case ArrayType(_: StructType, _) => - // These are always produced from json arrays by `arraySupport` in `JacksonParser`. (rows: Seq[InternalRow]) => new GenericArrayData(rows) } @@ -522,9 +519,7 @@ case class JsonToStruct( lazy val parser = new JacksonParser( rowSchema, - new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get), - objectSupport = schema.isInstanceOf[StructType], - arraySupport = schema.isInstanceOf[ArrayType]) + new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get)) override def dataType: DataType = schema @@ -532,17 +527,22 @@ case class JsonToStruct( copy(timeZoneId = Option(timeZoneId)) override def nullSafeEval(json: Any): Any = { - // `null` related behavior of this expression is as below: // When input is, - // - `null`: the output is `null`. - // - invalid json: the output is `null`. - // - empty string: the output is `null`. - // - empty json object: the output is Row(`null`). - // - empty json array: the output is `Nil`. + // - `null`: `null`. + // - invalid json: `null`. + // - empty string: `null`. + // + // When the schema is array, + // - json array: `Array(Row(...), ...)` + // - json object: `Array(Row(...))` + // - empty json array: `Array()`. + // - empty json object: `Array(Row(null))`. // - // Note that, it returns `null` if the schema is not matched. If the schema is - // `StructType`, then the json string should be json objects. If the schema is - // array of structs, then the string should be json arrays. + // When the schema is a struct, + // - json object/array with single element: `Row(...)` + // - json array with multiple elements: `null` + // - empty json array: `null`. + // - empty json object: `Row(null)`. // We need `null` if the input string is an empty string. `JacksonParser` can // deal with this but produces `Nil`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 28de182446fa..9bdcb0e691ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -39,13 +39,7 @@ private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeE */ class JacksonParser( schema: StructType, - options: JSONOptions, - arraySupport: Boolean = true, - objectSupport: Boolean = true) extends Logging { - - // One of both should be true. Otherwise, this parser can't parse any json. - require(arraySupport || objectSupport) - + options: JSONOptions) extends Logging { import JacksonUtils._ import ParseModes._ import com.fasterxml.jackson.core.JsonToken._ @@ -171,7 +165,7 @@ class JacksonParser( val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter).toArray (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, st) { - case START_OBJECT if objectSupport => convertObject(parser, st, fieldConverters) :: Nil + case START_OBJECT => convertObject(parser, st, fieldConverters) :: Nil // SPARK-3308: support reading top level JSON arrays and take every element // in such an array as a row // @@ -185,7 +179,7 @@ class JacksonParser( // List([str_a_1,null]) // List([str_a_2,null], [null,str_b_3]) // - case START_ARRAY if arraySupport => + case START_ARRAY => val array = convertArray(parser, elementConverter) // Here, as we support reading top level JSON arrays and take every element // in such an array as a row, this case is possible. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index c9865d03ddc6..b66058759000 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -373,39 +373,55 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("from_json - array") { - val jsonDataArr = """[{"a": 1}, {"a": 2}]""" val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + + // json array: `Array(Row(...), ...)` + val jsonData1 = """[{"a": 1}, {"a": 2}]""" val expected = InternalRow.fromSeq(1 :: Nil) :: InternalRow.fromSeq(2 :: Nil) :: Nil checkEvaluation(JsonToStruct( - schema, Map.empty, Literal(jsonDataArr), gmtId), expected) + schema, Map.empty, Literal(jsonData1), gmtId), expected) - // Empty array produces empty collection. - checkEvaluation(JsonToStruct(schema, Map.empty, Literal("[ ]"), gmtId), Nil) - } + // json object: `Array(Row(...))` + val jsonData2 = """{"a": 1}""" + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData2), gmtId), + InternalRow.fromSeq(1 :: Nil) :: Nil) - test("from_json empty object") { - val jsonData = """{ }""" - val schema = StructType(StructField("a", IntegerType) :: Nil) + // empty json array: `Array()`. + val jsonData3 = "[ ]" + checkEvaluation(JsonToStruct(schema, Map.empty, Literal(jsonData3), gmtId), Nil) + + // empty json object: `Array(Row(null))`. + val jsonData4 = "{ }" checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), InternalRow(null)) + JsonToStruct(schema, Map.empty, Literal(jsonData4), gmtId), + InternalRow(null) :: Nil) } - test("from_json schema mismatch - struct") { - val jsonData = """[{"a" 1}]""" - val schema = new StructType().add("a", IntegerType) + test("from_json - struct") { + val schema = StructType(StructField("a", IntegerType) :: Nil) + + // json object/array with single element: `Row(...)` + val jsonData1 = """[{"a": 1}]""" + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId), InternalRow(1)) + // json array with multiple elements: `null` + val jsonData2 = """[{"a": 1}, {"a": 2}]""" checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), null) - } + JsonToStruct(schema, Map.empty, Literal(jsonData2), gmtId), null) - test("from_json schema mismatch - array") { - val jsonData = """{"a" 1}""" - val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + // empty json array: `null`. + val jsonData3 = """[]""" + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData3), gmtId), null) + // empty json object: `Row(null)`. + val jsonData4 = """{ }""" checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), null) + JsonToStruct(schema, Map.empty, Literal(jsonData4), gmtId), InternalRow(null)) } test("from_json null input column") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 0436ec2ff717..cbef5265225b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2953,43 +2953,47 @@ object functions { } /** - * (Scala-specific) Parses a column containing a JSON object string into a `StructType` with the + * (Scala-specific) Parses a column containing a JSON string into a `StructType` with the * specified schema. Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON object data. - * @param schema the schema to use when parsing the json object string + * @param e a string column containing JSON data. + * @param schema the schema to use when parsing the json string * @param options options to control how the json is parsed. accepts the same options and the * json data source. * * @group collection_funcs * @since 2.1.0 */ - def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { - JsonToStruct(schema, options, e.expr) - } + def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = + from_json(e, schema.asInstanceOf[DataType], options) /** - * (Scala-specific) Parses a column containing a JSON array string into a `ArrayType` with the - * specified schema. Returns `null`, in the case of an unparseable string. + * (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType` + * with the specified schema. Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON array data. - * @param schema the schema to use when parsing the json array string + * @param e a string column containing JSON data. + * @param schema the schema to use when parsing the json string * @param options options to control how the json is parsed. accepts the same options and the * json data source. * * @group collection_funcs * @since 2.2.0 */ - def from_json(e: Column, schema: ArrayType, options: Map[String, String]): Column = withExpr { - JsonToStruct(schema, options, e.expr) + def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr { + schema match { + case _: StructType | ArrayType(_: StructType, _) => JsonToStruct(schema, options, e.expr) + case dt => + throw new IllegalArgumentException( + s"Input schema ${dt.simpleString} must be a struct or an array of structs.") + } } /** - * (Java-specific) Parses a column containing a JSON object string into a `StructType` with the + * (Java-specific) Parses a column containing a JSON string into a `StructType` with the * specified schema. Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON object data. - * @param schema the schema to use when parsing the json object string + * @param e a string column containing JSON data. + * @param schema the schema to use when parsing the json string * @param options options to control how the json is parsed. accepts the same options and the * json data source. * @@ -3000,26 +3004,26 @@ object functions { from_json(e, schema, options.asScala.toMap) /** - * (Java-specific) Parses a column containing a JSON array string into a `ArrayType` with the - * specified schema. Returns `null`, in the case of an unparseable string. + * (Java-specific) Parses a column containing a JSON object or array string into a `StructType` + * or `ArrayType` with the specified schema. Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON array data. - * @param schema the schema to use when parsing the json array string + * @param e a string column containing JSON object or array data. + * @param schema the schema to use when parsing the json object or array string * @param options options to control how the json is parsed. accepts the same options and the * json data source. * * @group collection_funcs * @since 2.2.0 */ - def from_json(e: Column, schema: ArrayType, options: java.util.Map[String, String]): Column = + def from_json(e: Column, schema: DataType, options: java.util.Map[String, String]): Column = from_json(e, schema, options.asScala.toMap) /** - * Parses a column containing a JSON object string into a `StructType` with the specified schema. + * Parses a column containing a JSON string into a `StructType` with the specified schema. * Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON object data. - * @param schema the schema to use when parsing the json object string + * @param e a string column containing JSON data. + * @param schema the schema to use when parsing the json string * * @group collection_funcs * @since 2.1.0 @@ -3028,37 +3032,30 @@ object functions { from_json(e, schema, Map.empty[String, String]) /** - * Parses a column containing a JSON array string into a `ArrayType` with the specified schema. - * Returns `null`, in the case of an unparseable string. + * Parses a column containing a JSON string into a `StructType` or `ArrayType` + * with the specified schema. Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON array data. - * @param schema the schema to use when parsing the json array string + * @param e a string column containing JSON data. + * @param schema the schema to use when parsing the json string * * @group collection_funcs * @since 2.2.0 */ - def from_json(e: Column, schema: ArrayType): Column = + def from_json(e: Column, schema: DataType): Column = from_json(e, schema, Map.empty[String, String]) /** - * Parses a column containing a JSON object or array string into a `StructType` or `ArrayType` + * Parses a column containing a JSON string into a `StructType` or `ArrayType` * with the specified schema. Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON object or array data. + * @param e a string column containing JSON data. * @param schema the schema to use when parsing the json string as a json string * * @group collection_funcs * @since 2.1.0 */ - def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = { - DataType.fromJson(schema) match { - case st: StructType => from_json(e, st, options) - case at @ ArrayType(_: StructType, _) => from_json(e, at, options) - case dt => - throw new IllegalArgumentException( - s"Input schema ${dt.simpleString} must be a struct or an array of structs.") - } - } + def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = + from_json(e, DataType.fromJson(schema), options) /** * (Scala-specific) Converts a column containing a `StructType` into a JSON string with the diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index e1fd64d3636e..cee74d0e776b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import scala.collection.JavaConverters._ - import org.apache.spark.sql.functions.{from_json, struct, to_json} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -139,7 +137,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { val df = Seq("""{"a" 1}""").toDS() val schema = ArrayType(StringType) val message = intercept[IllegalArgumentException] { - df.select(from_json($"value", schema.json, Map.empty[String, String].asJava)) + df.select(from_json($"value", schema)) }.getMessage assert(message.contains( @@ -164,10 +162,6 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer( df.select(from_json($"value", schema)), Row(Seq(Row(1, "a"), Row(2, null), Row(null, null)))) - - checkAnswer( - df.select(from_json($"value", schema.json, Map.empty[String, String].asJava)), - Row(Seq(Row(1, "a"), Row(2, null), Row(null, null)))) } test("to_json with option") { From 54e60bb149cd882c21856f19df0cf375c3ca3b20 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 28 Feb 2017 15:29:18 +0900 Subject: [PATCH 5/8] Revert unrelated newline change and cleanup comments --- .../apache/spark/sql/catalyst/json/JacksonParser.scala | 1 + .../src/main/scala/org/apache/spark/sql/functions.scala | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 9bdcb0e691ef..9b80c0fc87c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -40,6 +40,7 @@ private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeE class JacksonParser( schema: StructType, options: JSONOptions) extends Logging { + import JacksonUtils._ import ParseModes._ import com.fasterxml.jackson.core.JsonToken._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index cbef5265225b..052335a396eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3004,11 +3004,11 @@ object functions { from_json(e, schema, options.asScala.toMap) /** - * (Java-specific) Parses a column containing a JSON object or array string into a `StructType` - * or `ArrayType` with the specified schema. Returns `null`, in the case of an unparseable string. + * (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType` + * with the specified schema. Returns `null`, in the case of an unparseable string. * - * @param e a string column containing JSON object or array data. - * @param schema the schema to use when parsing the json object or array string + * @param e a string column containing JSON data. + * @param schema the schema to use when parsing the json string * @param options options to control how the json is parsed. accepts the same options and the * json data source. * From 9f1e96637cd6d67db0b5811daf2b33a9f49980a5 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 28 Feb 2017 15:35:05 +0900 Subject: [PATCH 6/8] Relocate test --- .../apache/spark/sql/JsonFunctionsSuite.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index cee74d0e776b..08087671ecf5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -144,14 +144,6 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { "Input schema array must be a struct or an array of structs.")) } - test("to_json") { - val df = Seq(Tuple1(Tuple1(1))).toDF("a") - - checkAnswer( - df.select(to_json($"a")), - Row("""{"_1":1}""") :: Nil) - } - test("from_json array support") { val df = Seq("""[{"a": 1, "b": "a"}, {"a": 2}, { }]""").toDS() val schema = ArrayType( @@ -164,6 +156,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row(Seq(Row(1, "a"), Row(2, null), Row(null, null)))) } + test("to_json") { + val df = Seq(Tuple1(Tuple1(1))).toDF("a") + + checkAnswer( + df.select(to_json($"a")), + Row("""{"_1":1}""") :: Nil) + } + test("to_json with option") { val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm") From 0c088bfc9469f5dc546f4d153ada609ad3b0b6ef Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 28 Feb 2017 15:48:59 +0900 Subject: [PATCH 7/8] Deduplicate type handling --- .../src/main/scala/org/apache/spark/sql/functions.scala | 7 +------ .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 052335a396eb..0e048aa4c05b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2980,12 +2980,7 @@ object functions { * @since 2.2.0 */ def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr { - schema match { - case _: StructType | ArrayType(_: StructType, _) => JsonToStruct(schema, options, e.expr) - case dt => - throw new IllegalArgumentException( - s"Input schema ${dt.simpleString} must be a struct or an array of structs.") - } + JsonToStruct(schema, options, e.expr) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 08087671ecf5..953d161ec2a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -136,7 +136,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { test("from_json invalid schema") { val df = Seq("""{"a" 1}""").toDS() val schema = ArrayType(StringType) - val message = intercept[IllegalArgumentException] { + val message = intercept[AnalysisException] { df.select(from_json($"value", schema)) }.getMessage From 3d490e34136ec76deaed15ebe8d6e7e8aac96776 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 4 Mar 2017 13:34:37 +0900 Subject: [PATCH 8/8] Address comments and clean up tests --- .../expressions/JsonExpressionsSuite.scala | 84 ++++++++++--------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index b66058759000..e3584909ddc4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -372,56 +372,60 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { ) } - test("from_json - array") { + test("from_json - input=array, schema=array, output=array") { + val input = """[{"a": 1}, {"a": 2}]""" val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val output = InternalRow(1) :: InternalRow(2) :: Nil + checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + } - // json array: `Array(Row(...), ...)` - val jsonData1 = """[{"a": 1}, {"a": 2}]""" - val expected = - InternalRow.fromSeq(1 :: Nil) :: - InternalRow.fromSeq(2 :: Nil) :: Nil - checkEvaluation(JsonToStruct( - schema, Map.empty, Literal(jsonData1), gmtId), expected) - - // json object: `Array(Row(...))` - val jsonData2 = """{"a": 1}""" - checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData2), gmtId), - InternalRow.fromSeq(1 :: Nil) :: Nil) + test("from_json - input=object, schema=array, output=array of single row") { + val input = """{"a": 1}""" + val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val output = InternalRow(1) :: Nil + checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + } - // empty json array: `Array()`. - val jsonData3 = "[ ]" - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(jsonData3), gmtId), Nil) + test("from_json - input=empty array, schema=array, output=empty array") { + val input = "[ ]" + val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val output = Nil + checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + } - // empty json object: `Array(Row(null))`. - val jsonData4 = "{ }" - checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData4), gmtId), - InternalRow(null) :: Nil) + test("from_json - input=empty object, schema=array, output=array of single row with null") { + val input = "{ }" + val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val output = InternalRow(null) :: Nil + checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) } - test("from_json - struct") { + test("from_json - input=array of single object, schema=struct, output=single row") { + val input = """[{"a": 1}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) + val output = InternalRow(1) + checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + } - // json object/array with single element: `Row(...)` - val jsonData1 = """[{"a": 1}]""" - checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId), InternalRow(1)) - - // json array with multiple elements: `null` - val jsonData2 = """[{"a": 1}, {"a": 2}]""" - checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData2), gmtId), null) + test("from_json - input=array, schema=struct, output=null") { + val input = """[{"a": 1}, {"a": 2}]""" + val schema = StructType(StructField("a", IntegerType) :: Nil) + val output = null + checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + } - // empty json array: `null`. - val jsonData3 = """[]""" - checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData3), gmtId), null) + test("from_json - input=empty array, schema=struct, output=null") { + val input = """[]""" + val schema = StructType(StructField("a", IntegerType) :: Nil) + val output = null + checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + } - // empty json object: `Row(null)`. - val jsonData4 = """{ }""" - checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData4), gmtId), InternalRow(null)) + test("from_json - input=empty object, schema=struct, output=single row with null") { + val input = """{ }""" + val schema = StructType(StructField("a", IntegerType) :: Nil) + val output = InternalRow(null) + checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) } test("from_json null input column") {