Skip to content

Commit ab06c25

Browse files
MaxGekkHyukjinKwon
authored andcommitted
[SPARK-24391][SQL] Support arrays of any types by from_json
## What changes were proposed in this pull request? The PR removes a restriction for element types of array type which exists in `from_json` for the root type. Currently, the function can handle only arrays of structs. Even array of primitive types is disallowed. The PR allows arrays of any types currently supported by JSON datasource. Here is an example of an array of a primitive type: ``` scala> import org.apache.spark.sql.functions._ scala> val df = Seq("[1, 2, 3]").toDF("a") scala> val schema = new ArrayType(IntegerType, false) scala> val arr = df.select(from_json($"a", schema)) scala> arr.printSchema root |-- jsontostructs(a): array (nullable = true) | |-- element: integer (containsNull = true) ``` and result of converting of the json string to the `ArrayType`: ``` scala> arr.show +----------------+ |jsontostructs(a)| +----------------+ | [1, 2, 3]| +----------------+ ``` ## How was this patch tested? I added a few positive and negative tests: - array of primitive types - array of arrays - array of structs - array of maps Closes #21439 from MaxGekk/from_json-array. Lead-authored-by: Maxim Gekk <[email protected]> Co-authored-by: Maxim Gekk <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
1 parent b270bcc commit ab06c25

File tree

7 files changed

+194
-26
lines changed

7 files changed

+194
-26
lines changed

python/pyspark/sql/functions.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2241,7 +2241,7 @@ def json_tuple(col, *fields):
22412241
def from_json(col, schema, options={}):
22422242
"""
22432243
Parses a column containing a JSON string into a :class:`MapType` with :class:`StringType`
2244-
as keys type, :class:`StructType` or :class:`ArrayType` of :class:`StructType`\\s with
2244+
as keys type, :class:`StructType` or :class:`ArrayType` with
22452245
the specified schema. Returns `null`, in the case of an unparseable string.
22462246
22472247
:param col: string column in json format
@@ -2269,6 +2269,11 @@ def from_json(col, schema, options={}):
22692269
>>> schema = schema_of_json(lit('''{"a": 0}'''))
22702270
>>> df.select(from_json(df.value, schema).alias("json")).collect()
22712271
[Row(json=Row(a=1))]
2272+
>>> data = [(1, '''[1, 2, 3]''')]
2273+
>>> schema = ArrayType(IntegerType())
2274+
>>> df = spark.createDataFrame(data, ("key", "value"))
2275+
>>> df.select(from_json(df.value, schema).alias("json")).collect()
2276+
[Row(json=[1, 2, 3])]
22722277
"""
22732278

22742279
sc = SparkContext._active_spark_context

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ case class JsonTuple(children: Seq[Expression])
495495
}
496496

497497
/**
498-
* Converts an json input string to a [[StructType]] or [[ArrayType]] of [[StructType]]s
498+
* Converts an json input string to a [[StructType]], [[ArrayType]] or [[MapType]]
499499
* with the specified schema.
500500
*/
501501
// scalastyle:off line.size.limit
@@ -544,34 +544,27 @@ case class JsonToStructs(
544544
timeZoneId = None)
545545

546546
override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
547-
case _: StructType | ArrayType(_: StructType, _) | _: MapType =>
547+
case _: StructType | _: ArrayType | _: MapType =>
548548
super.checkInputDataTypes()
549549
case _ => TypeCheckResult.TypeCheckFailure(
550-
s"Input schema ${nullableSchema.catalogString} must be a struct or an array of structs.")
551-
}
552-
553-
@transient
554-
lazy val rowSchema = nullableSchema match {
555-
case st: StructType => st
556-
case ArrayType(st: StructType, _) => st
557-
case mt: MapType => mt
550+
s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.")
558551
}
559552

560553
// This converts parsed rows to the desired output by the given schema.
561554
@transient
562555
lazy val converter = nullableSchema match {
563556
case _: StructType =>
564557
(rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
565-
case ArrayType(_: StructType, _) =>
566-
(rows: Seq[InternalRow]) => new GenericArrayData(rows)
558+
case _: ArrayType =>
559+
(rows: Seq[InternalRow]) => rows.head.getArray(0)
567560
case _: MapType =>
568561
(rows: Seq[InternalRow]) => rows.head.getMap(0)
569562
}
570563

571564
@transient
572565
lazy val parser =
573566
new JacksonParser(
574-
rowSchema,
567+
nullableSchema,
575568
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
576569

577570
override def dataType: DataType = nullableSchema

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class JacksonParser(
6161
dt match {
6262
case st: StructType => makeStructRootConverter(st)
6363
case mt: MapType => makeMapRootConverter(mt)
64+
case at: ArrayType => makeArrayRootConverter(at)
6465
}
6566
}
6667

@@ -101,6 +102,35 @@ class JacksonParser(
101102
}
102103
}
103104

105+
private def makeArrayRootConverter(at: ArrayType): JsonParser => Seq[InternalRow] = {
106+
val elemConverter = makeConverter(at.elementType)
107+
(parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, at) {
108+
case START_ARRAY => Seq(InternalRow(convertArray(parser, elemConverter)))
109+
case START_OBJECT if at.elementType.isInstanceOf[StructType] =>
110+
// This handles the case when an input JSON object is a structure but
111+
// the specified schema is an array of structures. In that case, the input JSON is
112+
// considered as an array of only one element of struct type.
113+
// This behavior was introduced by changes for SPARK-19595.
114+
//
115+
// For example, if the specified schema is ArrayType(new StructType().add("i", IntegerType))
116+
// and JSON input as below:
117+
//
118+
// [{"i": 1}, {"i": 2}]
119+
// [{"i": 3}]
120+
// {"i": 4}
121+
//
122+
// The last row is considered as an array with one element, and result of conversion:
123+
//
124+
// Seq(Row(1), Row(2))
125+
// Seq(Row(3))
126+
// Seq(Row(4))
127+
//
128+
val st = at.elementType.asInstanceOf[StructType]
129+
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
130+
Seq(InternalRow(new GenericArrayData(Seq(convertObject(parser, st, fieldConverters)))))
131+
}
132+
}
133+
104134
/**
105135
* Create a converter which converts the JSON documents held by the `JsonParser`
106136
* to a value according to a desired schema.

sql/core/src/main/scala/org/apache/spark/sql/functions.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3339,7 +3339,7 @@ object functions {
33393339

33403340
/**
33413341
* (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
3342-
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
3342+
* as keys type, `StructType` or `ArrayType` with the specified schema.
33433343
* Returns `null`, in the case of an unparseable string.
33443344
*
33453345
* @param e a string column containing JSON data.
@@ -3371,7 +3371,7 @@ object functions {
33713371

33723372
/**
33733373
* (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
3374-
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
3374+
* as keys type, `StructType` or `ArrayType` with the specified schema.
33753375
* Returns `null`, in the case of an unparseable string.
33763376
*
33773377
* @param e a string column containing JSON data.
@@ -3400,7 +3400,7 @@ object functions {
34003400

34013401
/**
34023402
* Parses a column containing a JSON string into a `MapType` with `StringType` as keys type,
3403-
* `StructType` or `ArrayType` of `StructType`s with the specified schema.
3403+
* `StructType` or `ArrayType` with the specified schema.
34043404
* Returns `null`, in the case of an unparseable string.
34053405
*
34063406
* @param e a string column containing JSON data.
@@ -3414,7 +3414,7 @@ object functions {
34143414

34153415
/**
34163416
* (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
3417-
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
3417+
* as keys type, `StructType` or `ArrayType` with the specified schema.
34183418
* Returns `null`, in the case of an unparseable string.
34193419
*
34203420
* @param e a string column containing JSON data.
@@ -3431,7 +3431,7 @@ object functions {
34313431

34323432
/**
34333433
* (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
3434-
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
3434+
* as keys type, `StructType` or `ArrayType` with the specified schema.
34353435
* Returns `null`, in the case of an unparseable string.
34363436
*
34373437
* @param e a string column containing JSON data.

sql/core/src/test/resources/sql-tests/inputs/json-functions.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,15 @@ select from_json('{"a":1, "b":"2"}', 'struct<a:int,b:string>');
3939
-- infer schema of json literal
4040
select schema_of_json('{"c1":0, "c2":[1]}');
4141
select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}'));
42+
43+
-- from_json - array type
44+
select from_json('[1, 2, 3]', 'array<int>');
45+
select from_json('[1, "2", 3]', 'array<int>');
46+
select from_json('[1, 2, null]', 'array<int>');
47+
48+
select from_json('[{"a": 1}, {"a":2}]', 'array<struct<a:int>>');
49+
select from_json('{"a": 1}', 'array<struct<a:int>>');
50+
select from_json('[null, {"a":2}]', 'array<struct<a:int>>');
51+
52+
select from_json('[{"a": 1}, {"b":2}]', 'array<map<string,int>>');
53+
select from_json('[{"a": 1}, 2]', 'array<map<string,int>>');

sql/core/src/test/resources/sql-tests/results/json-functions.sql.out

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-- Automatically generated by SQLQueryTestSuite
2-
-- Number of queries: 30
2+
-- Number of queries: 38
33

44

55
-- !query 0
@@ -290,3 +290,67 @@ select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}'))
290290
struct<jsontostructs({"c1":[1, 2, 3]}):struct<c1:array<bigint>>>
291291
-- !query 29 output
292292
{"c1":[1,2,3]}
293+
294+
295+
-- !query 30
296+
select from_json('[1, 2, 3]', 'array<int>')
297+
-- !query 30 schema
298+
struct<jsontostructs([1, 2, 3]):array<int>>
299+
-- !query 30 output
300+
[1,2,3]
301+
302+
303+
-- !query 31
304+
select from_json('[1, "2", 3]', 'array<int>')
305+
-- !query 31 schema
306+
struct<jsontostructs([1, "2", 3]):array<int>>
307+
-- !query 31 output
308+
NULL
309+
310+
311+
-- !query 32
312+
select from_json('[1, 2, null]', 'array<int>')
313+
-- !query 32 schema
314+
struct<jsontostructs([1, 2, null]):array<int>>
315+
-- !query 32 output
316+
[1,2,null]
317+
318+
319+
-- !query 33
320+
select from_json('[{"a": 1}, {"a":2}]', 'array<struct<a:int>>')
321+
-- !query 33 schema
322+
struct<jsontostructs([{"a": 1}, {"a":2}]):array<struct<a:int>>>
323+
-- !query 33 output
324+
[{"a":1},{"a":2}]
325+
326+
327+
-- !query 34
328+
select from_json('{"a": 1}', 'array<struct<a:int>>')
329+
-- !query 34 schema
330+
struct<jsontostructs({"a": 1}):array<struct<a:int>>>
331+
-- !query 34 output
332+
[{"a":1}]
333+
334+
335+
-- !query 35
336+
select from_json('[null, {"a":2}]', 'array<struct<a:int>>')
337+
-- !query 35 schema
338+
struct<jsontostructs([null, {"a":2}]):array<struct<a:int>>>
339+
-- !query 35 output
340+
[null,{"a":2}]
341+
342+
343+
-- !query 36
344+
select from_json('[{"a": 1}, {"b":2}]', 'array<map<string,int>>')
345+
-- !query 36 schema
346+
struct<jsontostructs([{"a": 1}, {"b":2}]):array<map<string,int>>>
347+
-- !query 36 output
348+
[{"a":1},{"b":2}]
349+
350+
351+
-- !query 37
352+
select from_json('[{"a": 1}, 2]', 'array<map<string,int>>')
353+
-- !query 37 schema
354+
struct<jsontostructs([{"a": 1}, 2]):array<map<string,int>>>
355+
-- !query 37 output
356+
NULL

sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
133133
Row(null) :: Nil)
134134
}
135135

136-
test("from_json invalid schema") {
136+
test("from_json - json doesn't conform to the array type") {
137137
val df = Seq("""{"a" 1}""").toDS()
138138
val schema = ArrayType(StringType)
139-
val message = intercept[AnalysisException] {
140-
df.select(from_json($"value", schema))
141-
}.getMessage
142139

143-
assert(message.contains(
144-
"Input schema array<string> must be a struct or an array of structs."))
140+
checkAnswer(df.select(from_json($"value", schema)), Seq(Row(null)))
145141
}
146142

147143
test("from_json array support") {
@@ -405,4 +401,72 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
405401

406402
assert(out.schema == expected)
407403
}
404+
405+
test("from_json - array of primitive types") {
406+
val df = Seq("[1, 2, 3]").toDF("a")
407+
val schema = new ArrayType(IntegerType, false)
408+
409+
checkAnswer(df.select(from_json($"a", schema)), Seq(Row(Array(1, 2, 3))))
410+
}
411+
412+
test("from_json - array of primitive types - malformed row") {
413+
val df = Seq("[1, 2 3]").toDF("a")
414+
val schema = new ArrayType(IntegerType, false)
415+
416+
checkAnswer(df.select(from_json($"a", schema)), Seq(Row(null)))
417+
}
418+
419+
test("from_json - array of arrays") {
420+
val jsonDF = Seq("[[1], [2, 3], [4, 5, 6]]").toDF("a")
421+
val schema = new ArrayType(ArrayType(IntegerType, false), false)
422+
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")
423+
424+
checkAnswer(
425+
sql("select json[0][0], json[1][1], json[2][2] from jsonTable"),
426+
Seq(Row(1, 3, 6)))
427+
}
428+
429+
test("from_json - array of arrays - malformed row") {
430+
val jsonDF = Seq("[[1], [2, 3], 4, 5, 6]]").toDF("a")
431+
val schema = new ArrayType(ArrayType(IntegerType, false), false)
432+
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")
433+
434+
checkAnswer(sql("select json[0] from jsonTable"), Seq(Row(null)))
435+
}
436+
437+
test("from_json - array of structs") {
438+
val jsonDF = Seq("""[{"a":1}, {"a":2}, {"a":3}]""").toDF("a")
439+
val schema = new ArrayType(new StructType().add("a", IntegerType), false)
440+
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")
441+
442+
checkAnswer(
443+
sql("select json[0], json[1], json[2] from jsonTable"),
444+
Seq(Row(Row(1), Row(2), Row(3))))
445+
}
446+
447+
test("from_json - array of structs - malformed row") {
448+
val jsonDF = Seq("""[{"a":1}, {"a:2}, {"a":3}]""").toDF("a")
449+
val schema = new ArrayType(new StructType().add("a", IntegerType), false)
450+
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")
451+
452+
checkAnswer(sql("select json[0], json[1]from jsonTable"), Seq(Row(null, null)))
453+
}
454+
455+
test("from_json - array of maps") {
456+
val jsonDF = Seq("""[{"a":1}, {"b":2}]""").toDF("a")
457+
val schema = new ArrayType(MapType(StringType, IntegerType, false), false)
458+
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")
459+
460+
checkAnswer(
461+
sql("""select json[0], json[1] from jsonTable"""),
462+
Seq(Row(Map("a" -> 1), Map("b" -> 2))))
463+
}
464+
465+
test("from_json - array of maps - malformed row") {
466+
val jsonDF = Seq("""[{"a":1} "b":2}]""").toDF("a")
467+
val schema = new ArrayType(MapType(StringType, IntegerType, false), false)
468+
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")
469+
470+
checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
471+
}
408472
}

0 commit comments

Comments
 (0)