Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3a7559b
Support arrays by from_json
MaxGekk May 26, 2018
b601a93
Fix comments
MaxGekk May 26, 2018
02a97ac
Support array of struct unpacking for backward compatibility
MaxGekk May 26, 2018
f34bd88
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk May 27, 2018
f062dd2
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk May 31, 2018
86d2f20
Added case insensitive options for jsonToStruct
MaxGekk May 31, 2018
9d0230a
Making added values private
MaxGekk May 31, 2018
181dcae
Unnecessary check of input params is removed
MaxGekk Jun 11, 2018
6d54cf0
Added comment for the unpackArray config
MaxGekk Jun 11, 2018
e321e37
Added a test when schema is array type but json input is {}
MaxGekk Jun 11, 2018
1c657e8
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk Jun 14, 2018
b5b0d9c
Making imports shorter
MaxGekk Jun 14, 2018
ce9918b
SQL tests for arrays
MaxGekk Jun 14, 2018
fced8ec
Enable unpackArray by default to keep backward compatibility
MaxGekk Jun 29, 2018
0fd0fb9
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk Jul 7, 2018
e49ee9d
Updating of sql tests
MaxGekk Jul 7, 2018
2bca7e0
Fix python tests
MaxGekk Jul 7, 2018
f3efb1b
Merge branch 'from_json-array' of github.com:MaxGekk/spark-1 into fro…
MaxGekk Jul 7, 2018
82d4fd5
Merge branch 'master' into from_json-array
MaxGekk Jul 13, 2018
758d1df
Fix tests - removing unused parameter
MaxGekk Jul 13, 2018
8349ca8
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk Jul 21, 2018
2746d35
Removing unpackArray option
MaxGekk Jul 21, 2018
bc3a2dd
Removing unused val
MaxGekk Jul 21, 2018
39a0a4e
Reverting unrelated changes
MaxGekk Jul 24, 2018
9c2681a
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk Jul 28, 2018
021350b
Addressing Liang-Chi Hsieh's review comments
MaxGekk Jul 28, 2018
89719c0
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk Aug 12, 2018
bdfd8a1
Added an example
MaxGekk Aug 12, 2018
74a7799
A few negative SQL tests
MaxGekk Aug 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2241,7 +2241,7 @@ def json_tuple(col, *fields):
def from_json(col, schema, options={}):
"""
Parses a column containing a JSON string into a :class:`MapType` with :class:`StringType`
as keys type, :class:`StructType` or :class:`ArrayType` of :class:`StructType`\\s with
as keys type, :class:`StructType` or :class:`ArrayType` with
the specified schema. Returns `null`, in the case of an unparseable string.

:param col: string column in json format
Expand Down Expand Up @@ -2269,6 +2269,11 @@ def from_json(col, schema, options={}):
>>> schema = schema_of_json(lit('''{"a": 0}'''))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=1))]
>>> data = [(1, '''[1, 2, 3]''')]
>>> schema = ArrayType(IntegerType())
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=[1, 2, 3])]
"""

sc = SparkContext._active_spark_context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ case class JsonTuple(children: Seq[Expression])
}

/**
* Converts an json input string to a [[StructType]] or [[ArrayType]] of [[StructType]]s
* Converts an json input string to a [[StructType]], [[ArrayType]] or [[MapType]]
* with the specified schema.
*/
// scalastyle:off line.size.limit
Expand Down Expand Up @@ -544,34 +544,27 @@ case class JsonToStructs(
timeZoneId = None)
Copy link
Member

@viirya viirya Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update the comment of JsonToStructs:

Converts an json input string to a [[StructType]] or [[ArrayType]] of [[StructType]]s.


override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
case _: StructType | ArrayType(_: StructType, _) | _: MapType =>
case _: StructType | _: ArrayType | _: MapType =>
super.checkInputDataTypes()
case _ => TypeCheckResult.TypeCheckFailure(
s"Input schema ${nullableSchema.catalogString} must be a struct or an array of structs.")
}

@transient
lazy val rowSchema = nullableSchema match {
case st: StructType => st
case ArrayType(st: StructType, _) => st
case mt: MapType => mt
s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.")
}

// This converts parsed rows to the desired output by the given schema.
@transient
lazy val converter = nullableSchema match {
case _: StructType =>
(rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
case ArrayType(_: StructType, _) =>
(rows: Seq[InternalRow]) => new GenericArrayData(rows)
case _: ArrayType =>
(rows: Seq[InternalRow]) => rows.head.getArray(0)
case _: MapType =>
(rows: Seq[InternalRow]) => rows.head.getMap(0)
}

@transient
lazy val parser =
new JacksonParser(
rowSchema,
nullableSchema,
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))

override def dataType: DataType = nullableSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class JacksonParser(
dt match {
case st: StructType => makeStructRootConverter(st)
case mt: MapType => makeMapRootConverter(mt)
case at: ArrayType => makeArrayRootConverter(at)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change accepts the json datasource form that the master can't parse? If so, I think we need tests in JsonSuite, too. cc: @HyukjinKwon

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change accepts the json datasource form that the master can't parse?

Right, it accept arrays of any types comparing to the master which accepts arrays of structs only

If so, I think we need tests in JsonSuite ...

I added a few tests to JsonSuite or do you mean some concrete test case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've already added tests in JsonSuite? It seems there are tests in JsonFunctionsSuite, JsonExpressionSuite, and SQLQueryTestSuite now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't catch you meant specific class. Just in case, what is the reason for adding tests to JsonSuite? I changed behavior of a function, so, JsonFunctionsSuite is perfect place for new tests, I believe. At the moment, you cannot specify a schema of json objects different from StructType in DataFrameReader. In this way, the ArrayType can come into JacksonParser only from json functions (from_json). I can add similar test to JsonSuite as in JsonFunctionsSuite but it doesn't make any sense from my point of view.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, ok. You touched the the JacksonParser.scala file, so I though there were some behaivour changes in the json datasorce. But I notice that there are not, so the current tests are enough. Thanks.

}
}

Expand Down Expand Up @@ -101,6 +102,35 @@ class JacksonParser(
}
}

private def makeArrayRootConverter(at: ArrayType): JsonParser => Seq[InternalRow] = {
val elemConverter = makeConverter(at.elementType)
(parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, at) {
case START_ARRAY => Seq(InternalRow(convertArray(parser, elemConverter)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 87:

        val array = convertArray(parser, elementConverter)
        // Here, as we support reading top level JSON arrays and take every element
        // in such an array as a row, this case is possible.
        if (array.numElements() == 0) {
          Nil
        } else {
          array.toArray[InternalRow](schema).toSeq
        }

Should we also follow this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in line 87 returns null for json input [] if schema is StructType(StructField("a", IntegerType) :: Nil). I would explain why we should return null in that case: we extract struct from the array. If the array is empty, it means there is nothing to extract and we returns null for the nothing.

In case when schema is ArrayType(...), I believe we should return empty array for empty JSON array []

case START_OBJECT if at.elementType.isInstanceOf[StructType] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this? This means that if the user asks for an array of data and the data doesn't contain an array we return an array with a single element. This seems wrong to me. I'd rather return null or throw an exception.

Copy link
Member Author

@MaxGekk MaxGekk Jul 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for backward compatibility to the behavior introduced by this PR: #16929, most likely by this: https://github.com/apache/spark/pull/16929/files#diff-6626026091295ad8c0dfb66ecbcd04b1R506 . Even special test was added: https://github.com/apache/spark/pull/16929/files#diff-88230f171af0b7a40791a867f9dd3a36R382 . Please, ask author @HyukjinKwon about reasons for the changes.

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an array of data is empty, and the schema is an array, it should return an empty array, https://github.com/apache/spark/pull/16929/files#diff-88230f171af0b7a40791a867f9dd3a36R389

Returning empty object or array is not even introduced by me anyway in JSON datasource.

Copy link
Member

@maropu maropu Jul 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a super weird case...can we put this special handling code for back-compatibility in JsonToStructs? Ether way, I think we should leave code comments here to make others understood easily?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what was done in that PR is pretty different and it is actually the opposite of what we are doing here. Indeed, there we are returning an array of structs when a struct is specified as schema and the JSON contains an array. Here we are returning an array with one struct when the schema is an array of struct and there is a struct instead of an array.

Despite I don't really like the behavior introduced in the PR you mentioned, I can understand it, as it was a way to support array of struct (the only at the moment) and I don't think we can change it before 3.0 at least for backward compatibility. But since here we are introducing a new behavior, if an array is required and a struct is found, I think returning an array with one element is a wrong/unexpected behavior and returning null would be what I'd expect as a user.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can only say that this case START_OBJECT was added to handle the case when an user specified a schema as an array of struct, and a struct is found in the input json. See the existing test case which I pointed out above: https://github.com/apache/spark/pull/16929/files#diff-88230f171af0b7a40791a867f9dd3a36R382 . I don't want to change the behavior in the PR and potentially break user's apps. What I would propose is to put the functionality under a spark.sql.legacy.* flag which could be deleted in Spark 3.0.

@maropu Initially I put the behavior under a flag in JsonToStructs but Reynold asked me to support new behavior without any new sticks/flags.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a comment on top of this case to explain it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding a comment for this.

// This handles the case when an input JSON object is a structure but
// the specified schema is an array of structures. In that case, the input JSON is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an example here, like what we did in makeStructRootConverter ?

// considered as an array of only one element of struct type.
// This behavior was introduced by changes for SPARK-19595.
//
// For example, if the specified schema is ArrayType(new StructType().add("i", IntegerType))
// and JSON input as below:
//
// [{"i": 1}, {"i": 2}]
// [{"i": 3}]
// {"i": 4}
//
// The last row is considered as an array with one element, and result of conversion:
//
// Seq(Row(1), Row(2))
// Seq(Row(3))
// Seq(Row(4))
//
val st = at.elementType.asInstanceOf[StructType]
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
Seq(InternalRow(new GenericArrayData(Seq(convertObject(parser, st, fieldConverters)))))
}
}

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema.
Expand Down
10 changes: 5 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3339,7 +3339,7 @@ object functions {

/**
* (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* as keys type, `StructType` or `ArrayType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
Expand Down Expand Up @@ -3371,7 +3371,7 @@ object functions {

/**
* (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* as keys type, `StructType` or `ArrayType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
Expand Down Expand Up @@ -3400,7 +3400,7 @@ object functions {

/**
* Parses a column containing a JSON string into a `MapType` with `StringType` as keys type,
* `StructType` or `ArrayType` of `StructType`s with the specified schema.
* `StructType` or `ArrayType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
Expand All @@ -3414,7 +3414,7 @@ object functions {

/**
* (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* as keys type, `StructType` or `ArrayType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
Expand All @@ -3431,7 +3431,7 @@ object functions {

/**
* (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* as keys type, `StructType` or `ArrayType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,15 @@ select from_json('{"a":1, "b":"2"}', 'struct<a:int,b:string>');
-- infer schema of json literal
select schema_of_json('{"c1":0, "c2":[1]}');
select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}'));

-- from_json - array type
select from_json('[1, 2, 3]', 'array<int>');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more cases ?
select from_json('[3, null, 4]', 'array')
select from_json('[3, "str", 4]', 'array')

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

select from_json('[1, "2", 3]', 'array<int>');
select from_json('[1, 2, null]', 'array<int>');

select from_json('[{"a": 1}, {"a":2}]', 'array<struct<a:int>>');
select from_json('{"a": 1}', 'array<struct<a:int>>');
select from_json('[null, {"a":2}]', 'array<struct<a:int>>');

select from_json('[{"a": 1}, {"b":2}]', 'array<map<string,int>>');
select from_json('[{"a": 1}, 2]', 'array<map<string,int>>');
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 30
-- Number of queries: 38


-- !query 0
Expand Down Expand Up @@ -290,3 +290,67 @@ select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}'))
struct<jsontostructs({"c1":[1, 2, 3]}):struct<c1:array<bigint>>>
-- !query 29 output
{"c1":[1,2,3]}


-- !query 30
select from_json('[1, 2, 3]', 'array<int>')
-- !query 30 schema
struct<jsontostructs([1, 2, 3]):array<int>>
-- !query 30 output
[1,2,3]


-- !query 31
select from_json('[1, "2", 3]', 'array<int>')
-- !query 31 schema
struct<jsontostructs([1, "2", 3]):array<int>>
-- !query 31 output
NULL


-- !query 32
select from_json('[1, 2, null]', 'array<int>')
-- !query 32 schema
struct<jsontostructs([1, 2, null]):array<int>>
-- !query 32 output
[1,2,null]


-- !query 33
select from_json('[{"a": 1}, {"a":2}]', 'array<struct<a:int>>')
-- !query 33 schema
struct<jsontostructs([{"a": 1}, {"a":2}]):array<struct<a:int>>>
-- !query 33 output
[{"a":1},{"a":2}]


-- !query 34
select from_json('{"a": 1}', 'array<struct<a:int>>')
-- !query 34 schema
struct<jsontostructs({"a": 1}):array<struct<a:int>>>
-- !query 34 output
[{"a":1}]


-- !query 35
select from_json('[null, {"a":2}]', 'array<struct<a:int>>')
-- !query 35 schema
struct<jsontostructs([null, {"a":2}]):array<struct<a:int>>>
-- !query 35 output
[null,{"a":2}]


-- !query 36
select from_json('[{"a": 1}, {"b":2}]', 'array<map<string,int>>')
-- !query 36 schema
struct<jsontostructs([{"a": 1}, {"b":2}]):array<map<string,int>>>
-- !query 36 output
[{"a":1},{"b":2}]


-- !query 37
select from_json('[{"a": 1}, 2]', 'array<map<string,int>>')
-- !query 37 schema
struct<jsontostructs([{"a": 1}, 2]):array<map<string,int>>>
-- !query 37 output
NULL
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(null) :: Nil)
}

test("from_json invalid schema") {
test("from_json - json doesn't conform to the array type") {
val df = Seq("""{"a" 1}""").toDS()
val schema = ArrayType(StringType)
val message = intercept[AnalysisException] {
df.select(from_json($"value", schema))
}.getMessage

assert(message.contains(
"Input schema array<string> must be a struct or an array of structs."))
checkAnswer(df.select(from_json($"value", schema)), Seq(Row(null)))
}

test("from_json array support") {
Expand Down Expand Up @@ -405,4 +401,72 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {

assert(out.schema == expected)
}

test("from_json - array of primitive types") {
val df = Seq("[1, 2, 3]").toDF("a")
val schema = new ArrayType(IntegerType, false)

checkAnswer(df.select(from_json($"a", schema)), Seq(Row(Array(1, 2, 3))))
}

test("from_json - array of primitive types - malformed row") {
val df = Seq("[1, 2 3]").toDF("a")
val schema = new ArrayType(IntegerType, false)

checkAnswer(df.select(from_json($"a", schema)), Seq(Row(null)))
}

test("from_json - array of arrays") {
val jsonDF = Seq("[[1], [2, 3], [4, 5, 6]]").toDF("a")
val schema = new ArrayType(ArrayType(IntegerType, false), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(
sql("select json[0][0], json[1][1], json[2][2] from jsonTable"),
Seq(Row(1, 3, 6)))
}

test("from_json - array of arrays - malformed row") {
val jsonDF = Seq("[[1], [2, 3], 4, 5, 6]]").toDF("a")
val schema = new ArrayType(ArrayType(IntegerType, false), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(sql("select json[0] from jsonTable"), Seq(Row(null)))
}

test("from_json - array of structs") {
val jsonDF = Seq("""[{"a":1}, {"a":2}, {"a":3}]""").toDF("a")
val schema = new ArrayType(new StructType().add("a", IntegerType), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(
sql("select json[0], json[1], json[2] from jsonTable"),
Seq(Row(Row(1), Row(2), Row(3))))
}

test("from_json - array of structs - malformed row") {
val jsonDF = Seq("""[{"a":1}, {"a:2}, {"a":3}]""").toDF("a")
val schema = new ArrayType(new StructType().add("a", IntegerType), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(sql("select json[0], json[1]from jsonTable"), Seq(Row(null, null)))
}

test("from_json - array of maps") {
val jsonDF = Seq("""[{"a":1}, {"b":2}]""").toDF("a")
val schema = new ArrayType(MapType(StringType, IntegerType, false), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(
sql("""select json[0], json[1] from jsonTable"""),
Seq(Row(Map("a" -> 1), Map("b" -> 2))))
}

test("from_json - array of maps - malformed row") {
val jsonDF = Seq("""[{"a":1} "b":2}]""").toDF("a")
val schema = new ArrayType(MapType(StringType, IntegerType, false), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
}
}