Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Sep 22, 2018

What changes were proposed in this pull request?

This issue was discovered during #21738 .

It turns out that limit is not whole-stage-codegened correctly and always consume all the inputs.

This patch fixes limit's whole-stage codegen. Some nodes like hash aggregate and range have loop structure that doesn't properly check the condition to stop early. It is fixed to stop consume inputs after limit number is reached.

Specifically, this patch removes the local stopEarly variable added by limit exec operators, and adds a isStopEarly class variable into BufferedRowIterator. The previous approach has bugs if there are multiple limit operators in one BufferedRowIterator, the override stopEarly will only check the last local stopEarly variable at the downstream limit operator. It doesn't work under limit + blocking operators (e.g. aggregate) + limit case.

The limit operator now works in whole stage codegen as following.

  1. limit operators call setStopEarly with true when the given limit number is reached. This sets the isStopEarly flag. When the query pipeline is going to consume next record, it goes to check the flag by calling stopEarly() method. If the flag is set, the query is stopped to consume next record.

For example,

while (iterator.hasNext() && !stopEarly()) {
  // upstream operators
  ...
  // code of limit operator
  if (limit_op_count < given_limit) {
    limit_op_count++;
    if (limit_op_count == given_limit) {
      // when hit last record, set the flag to stop the query pipeline
      setStopEarly(true);
    }
    // consume by downstream ops
    ...
  }
  ...
}
  1. blocking operators (e.g., sort, aggregate) should call setStopEarly with false to reset it, when all records from upstream operators are consumed. This makes the limit operators, if any, in the downstream of block operators, to be working well.

For example,

if (notSortYet) {
  // consume all records from upstream
  sorter.sort;
  // reset stop early flag
  setStopEarly(false);
}

while (sorter.hasMoreRecords && !stopEarly()) {
  // downstream operators
  // if there is downstream limit operator
  if (limit_downstream_op_count < given_limit) {
    limit_downstream_op_count++;
    if (limit_downstream_op_count == given_limit) {
      // when hit last record, set the flag to stop the query pipeline
      setStopEarly(true);
    }
    // consume by downstream ops
    ...
  }
}

How was this patch tested?

Added tests.

| int $localEnd = (int)($range / ${step}L);
| for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
| long $value = ((long)$localIdx * ${step}L) + $number;
| $numOutput.add(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this introduce a perf regression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no worry about it since it is a simple op.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very likely to hit perf regression since it's not a tight loop anymore.

We want the range operator to stop earlier for better performance, but it doesn't mean the range operator must return exactly the limit number of records. Since the range operator is already returning data in batch, I think we can stop earlier in a batch granularity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. then I should revert the numOutput change if the number of records can be a bit inaccurate.

| $stopEarly = true;
| }
| ${consume(ctx, input)}
| } else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to remove this? Isn't it safer to let it here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't execute into it. If we do, there should be a bug.

@SparkQA
Copy link

SparkQA commented Sep 22, 2018

Test build #96471 has finished for PR 22524 at commit 12703bd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya changed the title [SPARK-25497][SQL] Limit operation within whole stage codegen should not consume all the inputs [WIP][SPARK-25497][SQL] Limit operation within whole stage codegen should not consume all the inputs Sep 22, 2018
@viirya
Copy link
Member Author

viirya commented Sep 23, 2018

Is jenkins down now?

@xuanyuanking
Copy link
Member

Is jenkins down now?

Does this means you got a Reason: Error during SSL Handshake with remote server after open the jenkins link?

@viirya
Copy link
Member Author

viirya commented Sep 23, 2018

Does this means you got a Reason: Error during SSL Handshake with remote server after open the jenkins link?

Yes.

@xuanyuanking
Copy link
Member

Got it, same with me :(

@xuanyuanking
Copy link
Member

@viirya As @shaneknapp reply in mail-list, you can try https://hadrian.ist.berkeley.edu/jenkins/. Thanks @shaneknapp :)

@viirya
Copy link
Member Author

viirya commented Sep 24, 2018

@xuanyuanking Thanks.

}

test("SPARK-18004 limit + aggregates") {
test("SPARK-18528 limit + aggregates") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This JIRA number is wrong.

@SparkQA
Copy link

SparkQA commented Sep 24, 2018

Test build #96499 has finished for PR 22524 at commit a09e60f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 24, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Sep 24, 2018

Test build #96501 has finished for PR 22524 at commit a09e60f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya changed the title [WIP][SPARK-25497][SQL] Limit operation within whole stage codegen should not consume all the inputs [SPARK-25497][SQL] Limit operation within whole stage codegen should not consume all the inputs Sep 24, 2018
@viirya
Copy link
Member Author

viirya commented Sep 24, 2018

cc @cloud-fan

// This indicates whether the query execution should be stopped even the input rows are still
// available. This is used in limit operator. When it reaches the given number of rows to limit,
// this flag is set and the execution should be stopped.
protected boolean isStopEarly = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there are 2 limits in the query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test for 2 limits.

When any of 2 limits sets isStopEarly, I think the execution should be stopped. Is there any case opposite to this?

/**
* Sets the flag of stopping the query execution early.
*/
public void setStopEarly(boolean value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have more documents about how to use it? For now I see 2 use cases:

  1. limit operator should call it with true when the limit is hit
  2. blocking operator(sort, agg, etc.) should call it with false to reset it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let me add it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also hint me that we should reset stop early flag in sort exec node too. I will add it and related test.

| if ($countTerm < $limit) {
| $countTerm += 1;
| if ($countTerm == $limit) {
| setStopEarly(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do this after consume?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't we call shouldStop inside consume? if it does, stopEarly will not be set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ($countTerm == $limit) means this is the last record, and we should still consume it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. And I think shouldStop shouldn't be called inside consume.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually as I'm just looking at the query again, there should not be a stopEarly check inside consume that prevents us to consume the last record. Because the check should be at the outer while loop.

The cases having stopEarly check inside consume, is blocking operators like sort and aggregate, for them we need to reset the flag.

But for safety, I think I will also move this after consume.

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 25, 2018

It will be great to explain how limit works in whole stage codegen, in general. This part is a little hard to understand and I believe many operators need to deal with limit as well.

@viirya
Copy link
Member Author

viirya commented Sep 25, 2018

It will be great to explain how limit works in whole stage codegen, in general. This part is a little hard to understand and I believe many operators need to deal with limit as well.

Ok. Let me add more explanation into the PR description.

@SparkQA
Copy link

SparkQA commented Sep 25, 2018

Test build #96546 has finished for PR 22524 at commit 2f4d356.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@viirya thanks for adding the explanation! I think it's very clear and helpful. By reading this, I have a new idea.

It seems to me that limit is mostly to stop produce data earlier for upstream operators, so the code template should look like

while (iterator.hasNext() && !stopEarly()) {
  // upstream operators
  ...
  if (count < given_limit) {
    count += 1
    consume... // down  stream operators
  } else {
    setStopEarly(true);
  }
  ...
}

@viirya
Copy link
Member Author

viirya commented Sep 25, 2018

Doesn't this way consume one more record than given limit number?

@cloud-fan
Copy link
Contributor

We can always tune if (count < given_limit) to consume one more or less more record, isn't it?

@viirya
Copy link
Member Author

viirya commented Sep 25, 2018

hmm, in above suggested way, isn't setStopEarly(true) called when consuming given_limit + 1 records? Otherwise I may misunderstand it?

@cloud-fan
Copy link
Contributor

I didn't look into it, but we can change if (count < given_limit) to if (count < given_limit - 1) if you are right.

My focus is the code template, without the if else, how can the downstream operators stop consuming data?

@viirya
Copy link
Member Author

viirya commented Sep 26, 2018

Oh, I see. May the code template make some confusion. I'd change it. The downstream code is wrapped inside an if block. I don't clearly show how downstream operators work in codegen. Let me update it.

@SparkQA
Copy link

SparkQA commented Sep 27, 2018

Test build #96675 has finished for PR 22524 at commit 6d95b65.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 27, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Sep 27, 2018

Test build #96683 has finished for PR 22524 at commit 6d95b65.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 1, 2018

Test build #96807 has finished for PR 22524 at commit ed2c269.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Oct 1, 2018

ping @cloud-fan @mgaido91 Any more comments or questions on this change?

.filter('id2 >= 0)
twoLimitsDF.collect()
val twoLimitsDFNumRecords = twoLimitsDF.queryExecution.sparkPlan.collect {
case f@FilterExec(_, _: RangeExec) => f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spaces

.selectExpr("1 + id2 as id3")
sortDF.collect()
val sortNumRecords = sortDF.queryExecution.sparkPlan.collect {
case l@LocalLimitExec(_, f: FilterExec) => f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spaces

.selectExpr("id + 1 as id2").limit(1).filter('id > 50)
filterDF.collect()
val filterNumRecords = filterDF.queryExecution.sparkPlan.collect {
case f@FilterExec(_, r: RangeExec) => f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spaces

@mgaido91
Copy link
Contributor

mgaido91 commented Oct 1, 2018

@viirya my only concern about this is that this relies on resetting the flag to false in many places. So the maintenance may become difficult. The same for stopEarly(), I am afraid we may forget in the future to maintain it properly and we may miss some places. But I am not sure if there is a better solution. Is there a way we can achieve the same by adding a global boolean when there is a limit operation? If there is not and other people agree with this change, I am fine with it anyway, as I don't have a better suggestion. Thanks.

@viirya
Copy link
Member Author

viirya commented Oct 2, 2018

Thanks @mgaido91. Resetting a flag in blocking operators is feasible solution I think for now to solve this issue that limit operator consumes all inputs. For current blocking operators, we should add enough regression tests. I think we won't frequently add new operators, although I agree that the maintenance might be a potential issue.

I'm not sure about a global boolean is significant different than current flag. This flag is already a global boolean variable.

Fortunately I think this might not be so urgent to fix (is it?), so we may be able to wait for more options.

@SparkQA
Copy link

SparkQA commented Oct 2, 2018

Test build #96860 has finished for PR 22524 at commit 1b2ab61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Oct 5, 2018

In favor of #22630, so close this.

@viirya viirya closed this Oct 5, 2018
asfgit pushed a commit that referenced this pull request Oct 9, 2018
…not consume all the inputs

## What changes were proposed in this pull request?

This PR is inspired by #22524, but proposes a safer fix.

The current limit whole stage codegen has 2 problems:
1. It's only applied to `InputAdapter`, many leaf nodes can't stop earlier w.r.t. limit.
2. It needs to override a method, which will break if we have more than one limit in the whole-stage.

The first problem is easy to fix, just figure out which nodes can stop earlier w.r.t. limit, and update them. This PR updates `RangeExec`, `ColumnarBatchScan`, `SortExec`, `HashAggregateExec`.

The second problem is hard to fix. This PR proposes to propagate the limit counter variable name upstream, so that the upstream leaf/blocking nodes can check the limit counter and quit the loop earlier.

For better performance, the implementation here follows `CodegenSupport.needStopCheck`, so that we only codegen the check only if there is limit in the query. For columnar node like range, we check the limit counter per-batch instead of per-row, to make the inner loop tight and fast.

Why this is safer?
1. the leaf/blocking nodes don't have to check the limit counter and stop earlier. It's only for performance. (this is same as before)
2. The blocking operators can stop propagating the limit counter name, because the counter of limit after blocking operators will never increase, before blocking operators consume all the data from upstream operators. So the upstream operators don't care about limit after blocking operators. This is also for performance only, it's OK if we forget to do it for some new blocking operators.

## How was this patch tested?

a new test

Closes #22630 from cloud-fan/limit.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Kazuaki Ishizaki <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…not consume all the inputs

## What changes were proposed in this pull request?

This PR is inspired by apache#22524, but proposes a safer fix.

The current limit whole stage codegen has 2 problems:
1. It's only applied to `InputAdapter`, many leaf nodes can't stop earlier w.r.t. limit.
2. It needs to override a method, which will break if we have more than one limit in the whole-stage.

The first problem is easy to fix, just figure out which nodes can stop earlier w.r.t. limit, and update them. This PR updates `RangeExec`, `ColumnarBatchScan`, `SortExec`, `HashAggregateExec`.

The second problem is hard to fix. This PR proposes to propagate the limit counter variable name upstream, so that the upstream leaf/blocking nodes can check the limit counter and quit the loop earlier.

For better performance, the implementation here follows `CodegenSupport.needStopCheck`, so that we only codegen the check only if there is limit in the query. For columnar node like range, we check the limit counter per-batch instead of per-row, to make the inner loop tight and fast.

Why this is safer?
1. the leaf/blocking nodes don't have to check the limit counter and stop earlier. It's only for performance. (this is same as before)
2. The blocking operators can stop propagating the limit counter name, because the counter of limit after blocking operators will never increase, before blocking operators consume all the data from upstream operators. So the upstream operators don't care about limit after blocking operators. This is also for performance only, it's OK if we forget to do it for some new blocking operators.

## How was this patch tested?

a new test

Closes apache#22630 from cloud-fan/limit.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Kazuaki Ishizaki <[email protected]>
cfmcgrady pushed a commit to cfmcgrady/spark that referenced this pull request Jul 7, 2020
…not consume all the inputs

## What changes were proposed in this pull request?

This PR is inspired by apache#22524, but proposes a safer fix.

The current limit whole stage codegen has 2 problems:
1. It's only applied to `InputAdapter`, many leaf nodes can't stop earlier w.r.t. limit.
2. It needs to override a method, which will break if we have more than one limit in the whole-stage.

The first problem is easy to fix, just figure out which nodes can stop earlier w.r.t. limit, and update them. This PR updates `RangeExec`, `ColumnarBatchScan`, `SortExec`, `HashAggregateExec`.

The second problem is hard to fix. This PR proposes to propagate the limit counter variable name upstream, so that the upstream leaf/blocking nodes can check the limit counter and quit the loop earlier.

For better performance, the implementation here follows `CodegenSupport.needStopCheck`, so that we only codegen the check only if there is limit in the query. For columnar node like range, we check the limit counter per-batch instead of per-row, to make the inner loop tight and fast.

Why this is safer?
1. the leaf/blocking nodes don't have to check the limit counter and stop earlier. It's only for performance. (this is same as before)
2. The blocking operators can stop propagating the limit counter name, because the counter of limit after blocking operators will never increase, before blocking operators consume all the data from upstream operators. So the upstream operators don't care about limit after blocking operators. This is also for performance only, it's OK if we forget to do it for some new blocking operators.

## How was this patch tested?

a new test

Closes apache#22630 from cloud-fan/limit.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Kazuaki Ishizaki <[email protected]>
@viirya viirya deleted the SPARK-25497 branch December 27, 2023 18:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants