-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24391][SQL] Support arrays of any types by from_json #21439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #91189 has finished for PR 21439 at commit
|
|
Test build #91191 has finished for PR 21439 at commit
|
|
better to add tests in |
|
Can we also accept primitive arrays in |
| // can generate incorrect files if values are missing in columns declared as non-nullable. | ||
| val nullableSchema = if (forceNullableSchema) schema.asNullable else schema | ||
|
|
||
| val unpackArray: Boolean = options.get("unpackArray").map(_.toBoolean).getOrElse(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private? (This is not related to this pr though, nullableSchema also can be private?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make the option unpackArray case-insensitive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we add this new option here, I feel we'd be better to document somewhere (e.g., sq/functions.scala)
|
Thank you @maropu for your review of the PR.
What kind of tests would you expect in
I believe it should be implemented in another PR because the changes required for |
|
Test build #91350 has finished for PR 21439 at commit
|
|
retest this please. |
|
Test build #91356 has finished for PR 21439 at commit
|
|
|
||
| override def checkInputDataTypes(): TypeCheckResult = nullableSchema match { | ||
| case _: StructType | ArrayType(_: StructType, _) | _: MapType => | ||
| case ArrayType(_: StructType, _) if unpackArray => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if unpackArray is false, the next branch in line 558 still do super.checkInputDataTypes() for any ArrayType
| private def makeArrayRootConverter(at: ArrayType): JsonParser => Seq[InternalRow] = { | ||
| val elemConverter = makeConverter(at.elementType) | ||
| (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, at) { | ||
| case START_ARRAY => Seq(InternalRow(convertArray(parser, elemConverter))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In line 87:
val array = convertArray(parser, elementConverter)
// Here, as we support reading top level JSON arrays and take every element
// in such an array as a row, this case is possible.
if (array.numElements() == 0) {
Nil
} else {
array.toArray[InternalRow](schema).toSeq
}
Should we also follow this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in line 87 returns null for json input [] if schema is StructType(StructField("a", IntegerType) :: Nil). I would explain why we should return null in that case: we extract struct from the array. If the array is empty, it means there is nothing to extract and we returns null for the nothing.
In case when schema is ArrayType(...), I believe we should return empty array for empty JSON array []
| val nullableSchema = if (forceNullableSchema) schema.asNullable else schema | ||
|
|
||
| private val caseInsensitiveOptions = CaseInsensitiveMap(options) | ||
| private val unpackArray: Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? Can you add comments about it?
| val output = InternalRow(1) :: Nil | ||
| checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output) | ||
| checkEvaluation( | ||
| JsonToStructs(schema, Map("unpackArray" -> "true"), Literal(input), gmtId, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add case for unpackArray as false
IIUC that's because we need SQL parser tests for these kinds of SQL related functionality. |
|
Test build #91668 has finished for PR 21439 at commit
|
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
|
Test build #93734 has finished for PR 21439 at commit
|
|
Is there anything for now which blocks the PR? |
|
@gatorsmile @HyukjinKwon May I ask you to look at the PR one more time. |
|
@HyukjinKwon Are there any chances the PR will be merged? or I should close it? |
|
@gatorsmile Could you look at the PR, please. |
| case START_ARRAY => Seq(InternalRow(convertArray(parser, elemConverter))) | ||
| case START_OBJECT if at.elementType.isInstanceOf[StructType] => | ||
| // This handles the case when an input JSON object is a structure but | ||
| // the specified schema is an array of structures. In that case, the input JSON is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add an example here, like what we did in makeStructRootConverter ?
| select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}')); | ||
|
|
||
| -- from_json - array type | ||
| select from_json('[1, 2, 3]', 'array<int>'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more cases ?
select from_json('[3, null, 4]', 'array')
select from_json('[3, "str", 4]', 'array')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
|
LGTM |
|
Test build #94655 has finished for PR 21439 at commit
|
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
retest this please |
|
Test build #94659 has finished for PR 21439 at commit
|
|
retest this please. |
|
LGTM too. |
|
Test build #94666 has finished for PR 21439 at commit
|
|
retest this please |
|
Test build #94677 has finished for PR 21439 at commit
|
|
retest this please |
|
Test build #94680 has finished for PR 21439 at commit
|
|
Merged to master. |
|
I think R side is not update for this yet. @huaxingao would you like to do that? |
|
Sure. I will work on it. Thanks for letting me know. @viirya |
The PR removes a restriction for element types of array type which exists in `from_json` for the root type. Currently, the function can handle only arrays of structs. Even array of primitive types is disallowed. The PR allows arrays of any types currently supported by JSON datasource. Here is an example of an array of a primitive type:
```
scala> import org.apache.spark.sql.functions._
scala> val df = Seq("[1, 2, 3]").toDF("a")
scala> val schema = new ArrayType(IntegerType, false)
scala> val arr = df.select(from_json($"a", schema))
scala> arr.printSchema
root
|-- jsontostructs(a): array (nullable = true)
| |-- element: integer (containsNull = true)
```
and result of converting of the json string to the `ArrayType`:
```
scala> arr.show
+----------------+
|jsontostructs(a)|
+----------------+
| [1, 2, 3]|
+----------------+
```
I added a few positive and negative tests:
- array of primitive types
- array of arrays
- array of structs
- array of maps
Closes apache#21439 from MaxGekk/from_json-array.
Lead-authored-by: Maxim Gekk <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
The PR removes a restriction for element types of array type which exists in `from_json` for the root type. Currently, the function can handle only arrays of structs. Even array of primitive types is disallowed. The PR allows arrays of any types currently supported by JSON datasource. Here is an example of an array of a primitive type:
```
scala> import org.apache.spark.sql.functions._
scala> val df = Seq("[1, 2, 3]").toDF("a")
scala> val schema = new ArrayType(IntegerType, false)
scala> val arr = df.select(from_json($"a", schema))
scala> arr.printSchema
root
|-- jsontostructs(a): array (nullable = true)
| |-- element: integer (containsNull = true)
```
and result of converting of the json string to the `ArrayType`:
```
scala> arr.show
+----------------+
|jsontostructs(a)|
+----------------+
| [1, 2, 3]|
+----------------+
```
I added a few positive and negative tests:
- array of primitive types
- array of arrays
- array of structs
- array of maps
Closes apache#21439 from MaxGekk/from_json-array.
Lead-authored-by: Maxim Gekk <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
What changes were proposed in this pull request?
The PR removes a restriction for element types of array type which exists in
from_jsonfor the root type. Currently, the function can handle only arrays of structs. Even array of primitive types is disallowed. The PR allows arrays of any types currently supported by JSON datasource. Here is an example of an array of a primitive type:and result of converting of the json string to the
ArrayType:How was this patch tested?
I added a few positive and negative tests: