-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-33007][SQL] Simplify named_struct + get struct field + from_json expression chain #29942
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Thank you for pining me, @viirya . |
|
cc @sunchao |
|
Kubernetes integration test status success |
| * The optimization includes: | ||
| * 1. JsonToStructs(StructsToJson(child)) => child. | ||
| * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs. | ||
| * 3 struct(from_json.col1, from_json.col2, from_json.col3...) => struct(from_json) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 -> 3.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. thanks.
|
Test build #129394 has finished for PR 29942 at commit
|
| case c: CreateNamedStruct | ||
| if c.valExprs.forall(v => v.isInstanceOf[GetStructField] && | ||
| v.asInstanceOf[GetStructField].child.isInstanceOf[JsonToStructs]) => | ||
| val jsonToStructs = c.valExprs.map(_.children(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: _.children(0) -> _.children.head my IDE suggested.
| // alias field names. | ||
| if (semanticEqual && sameFieldName) { | ||
| val fromJson = jsonToStructs.head.asInstanceOf[JsonToStructs].copy(schema = c.dataType) | ||
| val nullFields = c.children.grouped(2).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map -> flatMap
| if c.valExprs.forall(v => v.isInstanceOf[GetStructField] && | ||
| v.asInstanceOf[GetStructField].child.isInstanceOf[JsonToStructs]) => | ||
| val jsonToStructs = c.valExprs.map(_.children(0)) | ||
| val semanticEqual = jsonToStructs.tail.forall(jsonToStructs.head.semanticEquals(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this check be merged with L39? https://github.com/apache/spark/pull/29942/files#diff-f9d27e3c9c32aaf07bb038c779309414R39
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can but L39 condition will look ugly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm I see. I noticed that it looped c.valExprs {3 x len(c.valExprs)} times to check the condition. Minor optimization though, I thought it would be nice if it could stop early if the condition not met.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let me change it and see how it looks like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the condition to top.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
Show resolved
Hide resolved
|
Thanks for quick response. Addressed the comments. |
|
Kubernetes integration test starting |
|
Test build #129402 has finished for PR 29942 at commit
|
|
Test build #129404 has finished for PR 29942 at commit
|
|
Kubernetes integration test status success |
|
retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test status success |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
Outdated
Show resolved
Hide resolved
|
@viirya just to clarify, is it to avoid calling the same |
|
Test build #129405 has finished for PR 29942 at commit
|
| * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs. | ||
| * 3. CreateNamedStruct(JsonToStructs(json).col1, JsonToStructs(json).col2, ...) => | ||
| * CreateNamedStruct(JsonToStructs(json)) if JsonToStructs(json) is shared among all | ||
| * fields of CreateNamedStruct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a fresh eye with no context this is still a bit confusing - does the list col1, col2 etc have to represent all columns in the json struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it could be part of the json struct. In the case, we will prune unnecessary columns in JsonToStructs.
| .select(namedStruct( | ||
| "a1", GetStructField(JsonToStructs(schema, options, 'json), 0), | ||
| "b", GetStructField(JsonToStructs(schema, options, 'json), 1)).as("struct")) | ||
| val optimized2 = Optimizer.execute(query2.analyze) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this is a bit repetitive - perhaps we can create a util method for the comparison? we can test evaluation in the method too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
| val duplicateFields = c.names.map(_.toString).distinct.length != c.names.length | ||
|
|
||
| // If we create struct from various fields of the same `JsonToStructs` and we don't | ||
| // alias field names and there is not duplicated fields in the struct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "there is not duplicated fields" -> "there is no duplicated field"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
|
Thanks @dongjoon-hyun for pinging and left some comments @viirya (sorry some comments are stale so pls ignore them). |
This patch targets specifically for a special pattern Sometimes the query optimizer can optimize a query to have many duplicated expressions e.g. For SPARK-32939, because it was not reported by me, some details I might not get from its description. We don't de-duplicate expressions in whole-stage codegen overall (but only in specified operator). If we disable whole-stage codegen, interpreted Project will de-duplicate expressions for some cases ( |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #129420 has finished for PR 29942 at commit
|
|
No more comment and it looks okay. |
|
Thanks @maropu |
|
Kubernetes integration test starting |
|
No more comments from me too. I am okay with this given that we have a plan for related tickets (#29942 (comment)). |
|
Kubernetes integration test status success |
|
Thanks @HyukjinKwon |
|
Test build #129430 has finished for PR 29942 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test starting |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
Show resolved
Hide resolved
|
Kubernetes integration test status failure |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
Show resolved
Hide resolved
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @viirya and all.
Merged to master for Apache Spark 3.1.0 on December 2020.
|
Thanks! |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #129469 has finished for PR 29942 at commit
|
|
Test build #129477 has finished for PR 29942 at commit
|
…on expression chain ### What changes were proposed in this pull request? This proposes to simplify named_struct + get struct field + from_json expression chain from `struct(from_json.col1, from_json.col2, from_json.col3...)` to `struct(from_json)`. ### Why are the changes needed? Simplify complex expression tree that could be produced by query optimization or user. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Closes apache#29942 from viirya/SPARK-33007. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This proposes to simplify named_struct + get struct field + from_json expression chain from
struct(from_json.col1, from_json.col2, from_json.col3...)tostruct(from_json).Why are the changes needed?
Simplify complex expression tree that could be produced by query optimization or user.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test.