Skip to content

Conversation

@yhuai
Copy link
Contributor

@yhuai yhuai commented Aug 21, 2015

https://issues.apache.org/jira/browse/SPARK-10143

With this PR, we will set min split size to parquet's block size (row group size) set in the conf if the min split size is smaller. So, we can avoid have too many tasks and even useless tasks for reading parquet data.

I tested it locally. The table I have has 343MB and it is in my local FS. Because I did not set any min/max split size, the default split size was 32MB and the map stage had 11 tasks. But there were only three tasks that actually read data. With my PR, there were only three tasks in the map stage. Here is the difference.

Without this PR:
image

With this PR:
image

Even if the block size setting does match the actual block size of parquet file, I think it is still generally good to use parquet's block size setting if min split size is smaller than this block size.

Tested it on a cluster using

val count = sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count

Basically, it reads 0 column of table store_sales. My table has 1824 parquet files with size from 80MB to 280MB (1 to 3 row group sizes). Without this patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this patch, the job had 2893 tasks and spent 64s. It is still not as good as using one mapper per file (1824 tasks and 42s), but it is much better than our master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mark it as private?

@liancheng
Copy link
Contributor

LGTM except 1 minor issue.

@SparkQA
Copy link

SparkQA commented Aug 21, 2015

Test build #41340 has finished for PR 8346 at commit e460545.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logDebug.

@SparkQA
Copy link

SparkQA commented Aug 21, 2015

Test build #41347 has finished for PR 8346 at commit aa40a88.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 21, 2015

Test build #41349 has finished for PR 8346 at commit a0a9306.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 21, 2015

Test build #41374 has finished for PR 8346 at commit 20bcfa4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public static class AppExecId
    • public static class StoreVersion

@yhuai
Copy link
Contributor Author

yhuai commented Aug 21, 2015

Tested it on a cluster using

val count = sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count

Basically, it reads 0 column of table store_sales. My table has 1824 parquet files with size from 80MB to 280MB (1 to 3 row group sizes). Without this patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this patch, the job had 2893 tasks and spent 64s. It is still not as good as using one mapper per file (1824 tasks and 42s), but it is much better than our master.

@yhuai
Copy link
Contributor Author

yhuai commented Aug 21, 2015

ok. I am merging it.

asfgit pushed a commit that referenced this pull request Aug 21, 2015
… as the min split size if necessary.

https://issues.apache.org/jira/browse/SPARK-10143

With this PR, we will set min split size to parquet's block size (row group size) set in the conf if the min split size is smaller. So, we can avoid have too many tasks and even useless tasks for reading parquet data.

I tested it locally. The table I have has 343MB and it is in my local FS. Because I did not set any min/max split size, the default split size was 32MB and the map stage had 11 tasks. But there were only three tasks that actually read data. With my PR, there were only three tasks in the map stage. Here is the difference.

Without this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399179/8587dba6-4765-11e5-9189-7ebba52a2b6d.png)

With this PR:
![image](https://cloud.githubusercontent.com/assets/2072857/9399185/a4735d74-4765-11e5-8848-1f1e361a6b4b.png)

Even if the block size setting does match the actual block size of parquet file, I think it is still generally good to use parquet's block size setting if min split size is smaller than this block size.

Tested it on a cluster using
```
val count = sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count
```
Basically, it reads 0 column of table `store_sales`. My table has 1824 parquet files with size from 80MB to 280MB (1 to 3 row group sizes). Without this patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this patch, the job had 2893 tasks and spent 64s. It is still not as good as using one mapper per file (1824 tasks and 42s), but it is much better than our master.

Author: Yin Huai <[email protected]>

Closes #8346 from yhuai/parquetMinSplit.

(cherry picked from commit e335509)
Signed-off-by: Yin Huai <[email protected]>
@asfgit asfgit closed this in e335509 Aug 21, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants