-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20392][SQL] Set barrier to prevent re-entering a tree #17770
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #76174 has finished for PR 17770 at commit
|
| } | ||
| } | ||
|
|
||
| case class Barrier(node: Option[TreeNode[_]] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just create a logical plan node and override the transformUp/transformDown functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original thought is: If we use a barrier node, we need to modify many places where we create a new logical plan and wrap it with the barrier node.
I will revamp it with a barrier node.
|
Test build #76182 has finished for PR 17770 at commit
|
|
Can we fix the description? It is really confusing since it uses the word exchange. Also can we just skip a plan if it is resolved in transform? |
|
@hvanhovell @rxin Thanks for comment. Ok. Based on your suggestion, looks like we have two options:
May I ask which one is preferred? |
|
Option 2 may not work because transformUp/transformDown is still used in Optimizer. |
| * `fromRow` method later. | ||
| */ | ||
| private val boundEnc = | ||
| private lazy val boundEnc = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't let boundEnc as lazy val because we need early exception when the encoder can't be resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For self-join de-duplication, we only set barrier for left side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we should check there's duplication between right and left sides and decide using barrier or not for right side.
|
Test build #76219 has finished for PR 17770 at commit
|
|
Test build #76222 has finished for PR 17770 at commit
|
|
@hvanhovell @rxin I've updated this accordingly. Do you have more comments on this? Thanks. |
|
Test build #76324 has finished for PR 17770 at commit
|
|
Test build #76326 has finished for PR 17770 at commit
|
|
cc @cloud-fan |
|
we can use |
|
There are many |
|
after we have this, can we remove the |
|
It is possible as I think |
|
I think we should have a single way to stop analyzing already analyzed plans. We should either apply |
|
Using |
|
|
|
is the analysis barrier applicable for all the cases? |
|
Currently I think it can cover all cases of
|
|
retest this please. |
|
Test build #77295 has finished for PR 17770 at commit
|
|
Test build #77331 has finished for PR 17770 at commit
|
|
Test build #77332 has finished for PR 17770 at commit
|
|
@cloud-fan @gatorsmile Please let me know if you have more comments on this change. Thanks. |
| */ | ||
| private val boundEnc = | ||
| exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer) | ||
| exprEnc.resolveAndBind(planWithBarrier.output, sparkSession.sessionState.analyzer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should only use planWithBarrier if necessary, this place is obviously unnecessary.
| s"New column names (${colNames.size}): " + colNames.mkString(", ")) | ||
|
|
||
| val newCols = logicalPlan.output.zip(colNames).map { case (oldAttribute, newName) => | ||
| val newCols = planWithBarrier.output.zip(colNames).map { case (oldAttribute, newName) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| @Experimental | ||
| @InterfaceStability.Evolving | ||
| def isStreaming: Boolean = logicalPlan.isStreaming | ||
| def isStreaming: Boolean = planWithBarrier.isStreaming |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| sparkSession, | ||
| LogicalRDD( | ||
| logicalPlan.output, | ||
| planWithBarrier.output, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5) | ||
| def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length | ||
| assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3) | ||
| assert(countRepartitions(doubleRepartitioned.queryExecution.analyzed) === 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
queryExecution.logical is the raw logical plan without eliminating analysis barrier. It fails this test since there's additional barrier node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see
|
LGTM except some minor comments |
|
Test build #77400 has finished for PR 17770 at commit
|
|
thanks, merging to master! |
|
I think this will bring conflicts if we backport new PRs to branch 2.2, @viirya can you send a new PR to backport it to branch 2.2? thanks! |
|
Whoa, I do not think we should back porting a large change to the inner workings of the analyzer. |
|
Ok, I won't backport this until we reach a consensus. |
|
Hi @viirya , as this PR already missed the Spark 2.2 release, I'd like to revert it and re-merge it at the end of Spark 2.3, so that future analyzer related PRs won't get conflicted when backporting to 2.2. I'm pretty sorry about this, I'll mark it as a blocker for spark 2.3 so that we don't forget. What do you think? |
|
@cloud-fan Ok. No problem for me. Thanks. |
|
reverted, thanks for your understanding! |
|
Hi @viirya , since it's close to Spark 2.3, would you like to reopen this PR? Thanks! |
|
@cloud-fan Sure. Seems there is no option to reopen it as it was merged before. Should I create another PR for it? |
|
yea, a new PR sounds good, thanks! |
What changes were proposed in this pull request?
It is reported that there is performance downgrade when applying ML pipeline for dataset with many columns but few rows.
A big part of the performance downgrade comes from some operations (e.g.,
select) on DataFrame/Dataset which re-create new DataFrame/Dataset with a newLogicalPlan. The cost can be ignored in the usage of SQL, normally.However, it's not rare to chain dozens of pipeline stages in ML. When the query plan grows incrementally during running those stages, the total cost spent on re-creation of DataFrame grows too. In particular, the
Analyzerwill go through the big query plan even most part of it is analyzed.By eliminating part of the cost, the time to run the example code locally is reduced from about 1min to about 30 secs.
In particular, the time applying the pipeline locally is mostly spent on calling transform of the 137
Bucketizers. Before the change, each call ofBucketizer's transform can cost about 0.4 sec. So the total time spent on allBucketizers' transform is about 50 secs. After the change, each call only costs about 0.1 sec.We also makeboundEncas lazy variable to reduce unnecessary running time.Performance improvement
The codes and datasets provided by Barry Becker to re-produce this issue and benchmark can be found on the JIRA.
Before this patch: about 1 min
After this patch: about 20 secs
How was this patch tested?
Existing tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.