-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20392][SQL] Set barrier to prevent re-entering a tree #19873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @cloud-fan @hvanhovell Basically this is the same changes in #17770. |
|
Test build #84417 has finished for PR 19873 at commit
|
|
retest this please. |
|
Test build #84420 has finished for PR 19873 at commit
|
136fd30 to
9f5a0e4
Compare
|
Test build #84430 has finished for PR 19873 at commit
|
| * | ||
| * @param rule the function use to transform this nodes children | ||
| */ | ||
| def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also remove the analyzed flag in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
| val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5) | ||
| def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length | ||
| assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3) | ||
| assert(countRepartitions(doubleRepartitioned.queryExecution.analyzed) === 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a necessary change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see previous discussion: https://github.com/apache/spark/pull/17770/files#r118480364
|
LGTM, also cc @gatorsmile |
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { | ||
| case p if p.analyzed => p |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, what do you mean why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which cases, we should still use the analyzed flag?
| * for all conflicting attributes. | ||
| */ | ||
| private def dedupRight (left: LogicalPlan, right: LogicalPlan): LogicalPlan = { | ||
| private def dedupRight (left: LogicalPlan, oriRight: LogicalPlan): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is oriRight ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use originalRight
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
| } | ||
| // For partitioned relation r, r.schema's column ordering can be different from the column | ||
| // ordering of data.logicalPlan (partition columns are all moved after data column). This | ||
| // ordering of data.logicalPlan (partition columns are all moved after data column). This |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get rid of changes in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
| case sa @ Sort(_, _, child: Aggregate) => sa | ||
|
|
||
| case s @ Sort(order, _, child) if !s.resolved && child.resolved => | ||
| case s @ Sort(order, _, oriChild) if !s.resolved && oriChild.resolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use originalChild
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
| } | ||
|
|
||
| case f @ Filter(cond, child) if !f.resolved && child.resolved => | ||
| case f @ Filter(cond, oriChild) if !f.resolved && oriChild.resolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use originalChild
|
From the PR description, I am unable to tell the changes made in this PR. We need a better description to explain what is the solution proposed in this PR. Also explains which cases need a special handling and the reason. |
bae034d to
54182bf
Compare
|
Test build #84475 has finished for PR 19873 at commit
|
|
Test build #84477 has finished for PR 19873 at commit
|
|
@viirya Could you resolve the conflicts? |
| } | ||
|
|
||
| /** A logical plan for setting a barrier of analysis */ | ||
| case class AnalysisBarrier(child: LogicalPlan) extends LeafNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the PR descriptions to the comment of this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
| /** | ||
| * This suite is used to test [[LogicalPlan]]'s `resolveOperators` and make sure it can correctly | ||
| * skips sub-trees that have already been marked as analyzed. | ||
| * This suite is used to test [[LogicalPlan]]'s `transformUp` plus analysis barrier and make sure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since both transformUp and transformDown work, create a test case using transformDown. Also update the comments here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
| case Kurtosis(e @ StringType()) => Kurtosis(Cast(e, DoubleType)) | ||
| } | ||
| override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = | ||
| plan transformAllExpressions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For indentation...
|
Test build #84518 has finished for PR 19873 at commit
|
|
Test build #84520 has finished for PR 19873 at commit
|
|
LGTM |
|
Thanks! Merged to master. |
|
Thanks! @gatorsmile @cloud-fan |
What changes were proposed in this pull request?
The SQL
Analyzergoes through a whole query plan even most part of it is analyzed. This increases the time spent on query analysis for long pipelines in ML, especially.This patch adds a logical node called
AnalysisBarrierthat wraps an analyzed logical plan to prevent it from analysis again. The barrier is applied to the analyzed logical plan inDataset. It won't change the output of wrapped logical plan and just acts as a wrapper to hide it from analyzer. New operations on the dataset will be put on the barrier, so only the new nodes created will be analyzed.This analysis barrier will be removed at the end of analysis stage.
How was this patch tested?
Added tests.