Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Nov 13, 2019

What changes were proposed in this pull request?

In origin way to judge if a DataSet is empty by

 def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan =>
    plan.executeCollect().head.getLong(0) == 0
  }

will add two shuffles by limit(), groupby() and count(), then collect all data to driver.
In this way we can avoid oom when collect data to driver. But it will trigger all partitions calculated and add more shuffle process.

We change it to

  def isEmpty: Boolean = withAction("isEmpty", select().queryExecution) { plan =>
    plan.executeTake(1).isEmpty
  }

After these pr, we will add a column pruning to origin LogicalPlan and use executeTake() API.
then we won't add more shuffle process and just compute only one partition's data in last stage.
In this way we can reduce cost when we call DataSet.isEmpty() and won't bring memory issue to driver side.

Why are the changes needed?

Optimize Dataset.isEmpty()

Does this PR introduce any user-facing change?

No

How was this patch tested?

Origin UT

@AngersZhuuuu
Copy link
Contributor Author

AngersZhuuuu commented Nov 13, 2019

@cloud-fan
As we discussed #26437 (comment), in this way is better?

*/
def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0) == 0
def isEmpty: Boolean = withAction("isEmpty", queryExecution) { plan =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do column pruning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do column pruning?

Of course, add it.

@cloud-fan
Copy link
Contributor

Can we have some benchmark numbers?

@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Nov 13, 2019

Test build #113700 has finished for PR 26500 at commit 56d2093.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2019

Test build #113711 has finished for PR 26500 at commit 908a39f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Nov 14, 2019

Can you address more in the PR desc for better commit logs, how to optimize it?

@srowen
Copy link
Member

srowen commented Nov 20, 2019

Ping @AngersZhuuuu

@AngersZhuuuu
Copy link
Contributor Author

Ping @AngersZhuuuu

Thank you ping, sorry for pending this work. A little busy these days.
Starting work on these things.

@AngersZhuuuu
Copy link
Contributor Author

  test("benchmark of empty") {
    var start = System.currentTimeMillis()
    var isEmpty = spark.range(10000000)
      .repartition(100)
      .limit(1)
      .groupBy()
      .count()
      .queryExecution.executedPlan.executeCollect().head.getLong(0) == 0
    println(isEmpty)
    var end = System.currentTimeMillis()
    // scalastyle:off
    println(s"duration = ${end - start}")

    start = System.currentTimeMillis()
    isEmpty = spark.range(10000000)
      .repartition(100)
      .select()
      .queryExecution.executedPlan.executeTake(1) == 0
    println(isEmpty)
    end = System.currentTimeMillis()
    // scalastyle:off
    println(s"duration = ${end - start}")
  }

Result
false
duration = 7248
false
duration = 1449

@cloud-fan @maropu @srowen
The test case is simple but can mimic the behavior before and after the API change.

@cloud-fan
Copy link
Contributor

great! can you enrich the PR description? Optimize Dataset.isEmpty() is good in the "Why" section but we need to put more in the "What" section. e.g. we change the implementation to avoid shuffles.

@AngersZhuuuu
Copy link
Contributor Author

great! can you enrich the PR description? Optimize Dataset.isEmpty() is good in the "Why" section but we need to put more in the "What" section. e.g. we change the implementation to avoid shuffles.

Updated , is clear now?

@cloud-fan
Copy link
Contributor

will add three shuffles by limit(), groupby() and count()

have you confirmed? groupby + count is one operator called Aggregate.

@AngersZhuuuu
Copy link
Contributor Author

will add three shuffles by limit(), groupby() and count()

have you confirmed? groupby + count is one operator called Aggregate.

Updated, count won't trigger shuffle.

@cloud-fan cloud-fan closed this in 6146dc4 Nov 21, 2019
@cloud-fan
Copy link
Contributor

thanks, merging to master!

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too

rahij pushed a commit to palantir/spark that referenced this pull request Jan 10, 2020
### What changes were proposed in this pull request?
In  origin way to judge if a DataSet is empty by
```
 def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan =>
    plan.executeCollect().head.getLong(0) == 0
  }
```
will add two shuffles by `limit()`, `groupby() and count()`, then collect all data to driver.
In this way we can avoid `oom` when collect data to driver. But it will trigger all partitions calculated and add more shuffle process.

We change it to
```
  def isEmpty: Boolean = withAction("isEmpty", select().queryExecution) { plan =>
    plan.executeTake(1).isEmpty
  }
```
After these pr, we will add a column pruning to origin LogicalPlan and use `executeTake()` API.
then we won't add more shuffle process and just compute only one partition's data in last stage.
In this way we can reduce cost when we call `DataSet.isEmpty()` and won't bring memory issue to driver side.

### Why are the changes needed?
Optimize Dataset.isEmpty()

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Origin UT

Closes apache#26500 from AngersZhuuuu/SPARK-29874.

Authored-by: angerszhu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
bulldozer-bot bot pushed a commit to palantir/spark that referenced this pull request Jan 14, 2020

### What changes were proposed in this pull request?
In  origin way to judge if a DataSet is empty by
```
 def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan =>
    plan.executeCollect().head.getLong(0) == 0
  }
```
will add two shuffles by `limit()`, `groupby() and count()`, then collect all data to driver.
In this way we can avoid `oom` when collect data to driver. But it will trigger all partitions calculated and add more shuffle process.

We change it to
```
  def isEmpty: Boolean = withAction("isEmpty", select().queryExecution) { plan =>
    plan.executeTake(1).isEmpty
  }
```
After these pr, we will add a column pruning to origin LogicalPlan and use `executeTake()` API.
then we won't add more shuffle process and just compute only one partition's data in last stage.
In this way we can reduce cost when we call `DataSet.isEmpty()` and won't bring memory issue to driver side.

### Why are the changes needed?
Optimize Dataset.isEmpty()

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Origin UT

Closes apache#26500 from AngersZhuuuu/SPARK-29874.

Authored-by: angerszhu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants