-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not consume the input when not necessary #22621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| s"if (shouldStop()) { $number = $value + ${step}L; return; }" | ||
|
|
||
| val processingLoop = if (parent.needStopCheck) { | ||
| // TODO (cloud-fan): do we really need to do the stop check within batch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the motivation of bringing the discussion at #10989 (comment)
If it's OK to not interrupt the loop and buffer result rows for join, I think it's also OK here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we don't, then we would consume more rows than needed, don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we just buffer more rows in BufferRowIterator.currentRows, it's only about performance IIUC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmmh, but localIdx would become localEnd then, right? So the UTs you added would fail, or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that there is BroadcastHashJoin case doesn't mean it is generally ok to buffer more rows. If it is possible, we still should avoid it.
|
ok to test |
mgaido91
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for pinging me @cloud-fan.
| while (iter.hasNext && (n < 0 || count < n)) { | ||
| // `iter.hasNext` may produce one row and buffer it, we should only call it when the limit is | ||
| // not hit. | ||
| while ((n < 0 || count < n) && iter.hasNext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch this one!
| | $shouldStop | ||
| | $processingLoop | ||
| | } else { | ||
| | long $nextBatchTodo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you move these lines in the else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we don't do return when we need to interrupt the loop. Move these lines to else, so that we won't hit this code path when loop is interrupted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but in this way we are looping 2 more times in the outer loop, because we either go in the if or in the else while previously we were doing both on the same iteration IIUC. I don't think it is a big issue but it may introduce a (very small probably) overhead compared to the previous case.
Since if IIUC in the first iteration we just go to the else branch now, since batchEnd is inited to nextIndex, do you think it is feasible to move this block before the inner loop? So we would solve both issues, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea!
| s"if (shouldStop()) { $number = $value + ${step}L; return; }" | ||
|
|
||
| val processingLoop = if (parent.needStopCheck) { | ||
| // TODO (cloud-fan): do we really need to do the stop check within batch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we don't, then we would consume more rows than needed, don't we?
| df.queryExecution.executedPlan.foreach { | ||
| case w: WholeStageCodegenExec => | ||
| w.child.foreach { | ||
| case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a big issue, but if later we change things and these are not anymore here, we would not run the assert here. I would suggest to collect the FilterExec and the RangeExec and enforce that we collected 1 of both and then assert on them. What do you think?
Moreover, nit: would it be possible to dedup the code here? The tests are very similar with codegen on and off, only collecting the two exec nodes differs...
| // and a new batch is started. | ||
| // In the implementation below, the code in the inner loop is producing all the values | ||
| // within a batch, while the code in the outer loop is setting batch parameters and updating | ||
| // the metrics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment should be updated too.
| } | ||
|
|
||
| withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { | ||
| // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does first task mean first partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
|
retest this please. |
|
Test build #96895 has finished for PR 22621 at commit
|
|
I simplified this PR to focus on |
|
Test build #96920 has finished for PR 22621 at commit
|
|
Test build #96924 has finished for PR 22621 at commit
|
|
retest this please |
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending Jenkins
mgaido91
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM apart from one minor comment. Thanks
| df: DataFrame, | ||
| filterNumOutputs: Int, | ||
| rangeNumOutputs: Int): Unit = { | ||
| var filter: FilterExec = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about something like this:
def collectExecNode[T](pf: PartialFunction[SparkPlan, T]): PartialFunction[SparkPlan, T] = {
pf.orElse {
case w: WholeStageCodegenExec =>
w.child.collect(pf).head
}
}
val range = df.queryExecution.executedPlan.collectFirst(
collectExecNode { case r: RangeExec => r })
val filter = df.queryExecution.executedPlan.collectFirst(
collectExecNode { case f: FilterExec => f })
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future if we need to catch more nodes, we should abstract it. But for now it's only range and filter, I think it's ok.
|
LGTM |
|
Test build #96927 has finished for PR 22621 at commit
|
|
thanks, merging to master/2.4! |
…nput when not necessary ## What changes were proposed in this pull request? In `SparkPlan.getByteArrayRdd`, we should only call `it.hasNext` when the limit is not hit, as `iter.hasNext` may produce one row and buffer it, and cause wrong metrics. ## How was this patch tested? new tests Closes #22621 from cloud-fan/range. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 71c24aa) Signed-off-by: Wenchen Fan <[email protected]>
|
Let's say, this can be behaivour changes too since metrics are now changed. Should we update migration guide for safety? |
|
why do we need migration guide for bug fix? |
|
That's my point. Why do we have to document for fixing unexpected results fixed |
…nput when not necessary ## What changes were proposed in this pull request? In `SparkPlan.getByteArrayRdd`, we should only call `it.hasNext` when the limit is not hit, as `iter.hasNext` may produce one row and buffer it, and cause wrong metrics. ## How was this patch tested? new tests Closes apache#22621 from cloud-fan/range. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
In
SparkPlan.getByteArrayRdd, we should only callit.hasNextwhen the limit is not hit, asiter.hasNextmay produce one row and buffer it, and cause wrong metrics.How was this patch tested?
new tests