-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-33427][SQL] Add subexpression elimination for interpreted expression evaluation #30341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main code change should be ready for review. I marked it as WIP because I'd like to add some tests later.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
GitHub Actions were passed actually. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Thank you for pinging me, @viirya . |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
|
Test build #130955 has finished for PR 30341 at commit
|
This comment has been minimized.
This comment has been minimized.
|
Kubernetes integration test starting |
|
retest this please |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
Yea, I don't have any more comment, so I'll leave this to @cloud-fan . |
|
Test build #131107 has finished for PR 30341 at commit
|
| subexpressionElimination off, codegen on 25932 26908 916 0.0 259320042.3 1.0X | ||
| subexpressionElimination off, codegen off 26085 26159 65 0.0 260848905.0 1.0X | ||
| subexpressionElimination on, codegen on 2860 2939 72 0.0 28603312.9 9.1X | ||
| subexpressionElimination on, codegen off 2517 2617 93 0.0 25165157.7 10.3X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this additional 10.3x.
| private def replaceWithProxy( | ||
| expr: Expression, | ||
| proxyMap: Map[Expression, ExpressionProxy]): Expression = { | ||
| proxyMap.getOrElse(expr, expr.mapChildren(replaceWithProxy(_, proxyMap))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a top-down traverse, which means we will recursively replace expr with proxy even for ExpressionProxy. Is it expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we replace one expression with ExpressionProxy, we stop traversing down. We only traverse down to children if cannot find current expression in proxyMap. Is this for your question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry I misread the code. You are right.
|
|
||
| expressions.foreach(equivalentExpressions.addExprTree(_)) | ||
|
|
||
| val proxyMap = mutable.Map.empty[Expression, ExpressionProxy] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it OK to use a simple map here? Two expressions may not equal to each other even if they semantically equal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For semantically equal exprs, we put a pair of expr -> proxy into the map and note the proxy is the same. So later we traverse down into expressions, we look at the map. We don't do semantically comparing when looking at this map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I get it now. Seems we can use IdentityHashMap to be more explicit.
| // ( (one * two) * (one * two) ) | ||
| assert(proxys.size == 2) | ||
| val expected = ExpressionProxy(mul2, runtime) | ||
| assert(proxys.head == expected) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be proxys.forall(_ == expected)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, you're right.
| }) | ||
| assert(proxys.isEmpty) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we test attributes?
val attr1 = AttributeReference("a", ...)
val attr2 = attr1.withName("A")
To make sure 2 semantically-equal attributes can be optimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EquivalentExpressions skips for LeafExpression. So attributes won't be counted for subexpression.
...lyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #131184 has finished for PR 30341 at commit
|
|
thanks, merging to master! |
|
Thank you, @viirya , @maropu , @cloud-fan ! |
|
Thanks @dongjoon-hyun @maropu @cloud-fan |
|
Sorry for a late comment. +1, nice. |
|
Thanks @HyukjinKwon! |
…equantially ### What changes were proposed in this pull request? This follow-up fixes an issue when inserting key/value pairs into `IdentityHashMap` in `SubExprEvaluationRuntime`. ### Why are the changes needed? The last commits to #30341 follows review comment to use `IdentityHashMap`. Because we leverage `IdentityHashMap` to compare keys in reference, we should not convert expression pairs to Scala map before inserting. Scala map compares keys by equality so we will loss keys with different references. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run benchmark to verify. Closes #30459 from viirya/SPARK-33427-map. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
What changes were proposed in this pull request?
This patch proposes to add subexpression elimination for interpreted expression evaluation. Interpreted expression evaluation is used when codegen was not able to work, for example complex schema.
Why are the changes needed?
Currently we only do subexpression elimination for codegen. For some reasons, we may need to run interpreted expression evaluation. For example, codegen fails to compile and fallbacks to interpreted mode, or complex input/output schema of expressions. It is commonly seen for complex schema from expressions that is possibly caused by the query optimizer too, e.g. SPARK-32945.
We should also support subexpression elimination for interpreted evaluation. That could reduce performance difference when Spark fallbacks from codegen to interpreted expression evaluation, and improve Spark usability.
Benchmark
Update
SubExprEliminationBenchmark:Before:
After:
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test. Benchmark manually.