Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Jul 17, 2014

During aggregation in Python worker, if the memory usage is above spark.executor.memory, it will do disk spilling aggregation.

It will split the aggregation into multiple stage, in each stage, it will partition the aggregated data by hash and dump them into disks. After all the data are aggregated, it will merge all the stages together (partition by partition).

@SparkQA
Copy link

SparkQA commented Jul 17, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16772/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 17, 2014

QA results for PR 1460:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class Merger(object):
class AutoSerializer(FramedSerializer):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16772/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 17, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16782/consoleFull

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should actually rotate among storage directories in spark.local.dir. Check out how the DiskStore works in Java.

@SparkQA
Copy link

SparkQA commented Jul 17, 2014

QA results for PR 1460:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class Merger(object):
class AutoSerializer(FramedSerializer):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16782/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 18, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16835/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 18, 2014

QA results for PR 1460:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(FramedSerializer):
class Merger(object):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16835/consoleFull

@mateiz
Copy link
Contributor

mateiz commented Jul 20, 2014

It looks like pushing a new rebased commit hid my comments, but click on them above to make sure you see them.

add spark.python.worker.memory for memory used by Python worker.
Default is 512M.
@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16918/consoleFull

davies added 2 commits July 21, 2014 12:10
@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16919/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA results for PR 1460:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(FramedSerializer):
class Merger(object):
class MapMerger(Merger):
class ExternalHashMapMerger(Merger):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16918/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA results for PR 1460:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(FramedSerializer):
class Merger(object):
class MapMerger(Merger):
class ExternalHashMapMerger(Merger):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16919/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16929/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA results for PR 1460:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(FramedSerializer):
class Merger(object):
class MapMerger(Merger):
class ExternalHashMapMerger(Merger):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16929/consoleFull

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small typo: go -> goes

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17053/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA results for PR 1460:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(FramedSerializer):
class Aggregator(object):
class SimpleAggregator(Aggregator):
class Merger(object):
class InMemoryMerger(Merger):
class ExternalMerger(Merger):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17043/consoleFull

@davies
Copy link
Contributor Author

davies commented Jul 23, 2014

The last commit has fixed the tests, should run it again.

@mateiz
Copy link
Contributor

mateiz commented Jul 23, 2014

Looks like the latest tested code has an error in the test suite:

Running PySpark tests. Output is in python/unit-tests.log.
Running test: pyspark/rdd.py
  File "pyspark/rdd.py", line 1239
    def add_shuffle_key(split, iterator):
      ^
SyntaxError: invalid syntax
Had test failures; see logs.

@mateiz
Copy link
Contributor

mateiz commented Jul 23, 2014

Ah never mind.

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA results for PR 1460:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(FramedSerializer):
class Aggregator(object):
class SimpleAggregator(Aggregator):
class Merger(object):
class InMemoryMerger(Merger):
class ExternalMerger(Merger):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17053/consoleFull

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately memory_info only works in psutil 2.0. I tried the Anaconda Python distribution on Mac, which has psutil 1.2.1, and it doesn't work there. In there you have to use get_memory_info() instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, maybe don't call the process "self", it's kind of confusing since it sounds like a "this" object

@SparkQA
Copy link

SparkQA commented Jul 24, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17094/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 24, 2014

QA results for PR 1460:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(FramedSerializer):
class Aggregator(object):
class SimpleAggregator(Aggregator):
class Merger(object):
class InMemoryMerger(Merger):
class ExternalMerger(Merger):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17094/consoleFull

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want these unit tests to be run by Jenkins, you need to also call this file in python/run-tests. Seems worthwhile since there are some tests in ExternalMerger.

@SparkQA
Copy link

SparkQA commented Jul 24, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17110/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 24, 2014

QA results for PR 1460:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(FramedSerializer):
class Aggregator(object):
class SimpleAggregator(Aggregator):
class Merger(object):
class InMemoryMerger(Merger):
class ExternalMerger(Merger):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17110/consoleFull

@mateiz
Copy link
Contributor

mateiz commented Jul 24, 2014

Hey Davies, I tried this out a bit and saw two issues / areas for improvement:

  1. Since the ExternalMerger is used in both map tasks and reduce tasks, one problem that can happen is that the reduce task's data is already hashed modulo the # of reduce tasks, and so you get many empty buckets. For example, if you have 2 reduce tasks, task 0 gets all the values whose hash code is even, so it can only use half its buckets. If you have 64 reduce tasks, only one bucket is used.

The best way to fix this would be to hash values with a random hash function when choosing the bucket. One simple way might be to generate a random integer X for each ExternalMerger and then take hash((key, X)) instead of hash(key) when choosing the bucket. This is equivalent to salting your hash function. Maybe you have other ideas but I'd suggest trying this first.

  1. I also noticed that sometimes maps would fill up again before the old memory was fully freed, leading to smaller spills. For example, for (Int, Int) pairs the first spill from 512 MB memory is about 68 MB of files, but later spills were only around 20 MB. I found that I could get better performance overall by adding some gc.collect() calls after every data.clear() and pdata.clear(). This freed more memory faster and allowed us to do more work in memory before spilling. The perf difference for one test job was around 30% but you should try it on your own jobs.

@mateiz
Copy link
Contributor

mateiz commented Jul 24, 2014

BTW here's a patch that adds the GC calls I talked about above: https://gist.github.com/mateiz/297b8618ed033e7c8005

@SparkQA
Copy link

SparkQA commented Jul 25, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17152/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 25, 2014

QA tests have started for PR 1460. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17153/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 25, 2014

QA results for PR 1460:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(FramedSerializer):
class Aggregator(object):
class SimpleAggregator(Aggregator):
class Merger(object):
class InMemoryMerger(Merger):
class ExternalMerger(Merger):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17152/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 25, 2014

QA results for PR 1460:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(FramedSerializer):
class Aggregator(object):
class SimpleAggregator(Aggregator):
class Merger(object):
class InMemoryMerger(Merger):
class ExternalMerger(Merger):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17153/consoleFull

@mateiz
Copy link
Contributor

mateiz commented Jul 25, 2014

Thanks Davies. I've merged this in.

@asfgit asfgit closed this in 14174ab Jul 25, 2014
@davies
Copy link
Contributor Author

davies commented Jul 25, 2014

Awesome!

@davies davies deleted the spill branch July 29, 2014 00:42
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
During aggregation in Python worker, if the memory usage is above spark.executor.memory, it will do disk spilling aggregation.

It will split the aggregation into multiple stage, in each stage, it will partition the aggregated data by hash and dump them into disks. After all the data are aggregated, it will merge all the stages together (partition by partition).

Author: Davies Liu <[email protected]>

Closes apache#1460 from davies/spill and squashes the following commits:

cad91bf [Davies Liu] call gc.collect() after data.clear() to release memory as much as possible.
37d71f7 [Davies Liu] balance the partitions
902f036 [Davies Liu] add shuffle.py into run-tests
dcf03a9 [Davies Liu] fix memory_info() of psutil
67e6eba [Davies Liu] comment for MAX_TOTAL_PARTITIONS
f6bd5d6 [Davies Liu] rollback next_limit() again, the performance difference is huge:
e74b785 [Davies Liu] fix code style and change next_limit to memory_limit
400be01 [Davies Liu] address all the comments
6178844 [Davies Liu] refactor and improve docs
fdd0a49 [Davies Liu] add long doc string for ExternalMerger
1a97ce4 [Davies Liu] limit used memory and size of objects in partitionBy()
e6cc7f9 [Davies Liu] Merge branch 'master' into spill
3652583 [Davies Liu] address comments
e78a0a0 [Davies Liu] fix style
24cec6a [Davies Liu] get local directory by SPARK_LOCAL_DIR
57ee7ef [Davies Liu] update docs
286aaff [Davies Liu] let spilled aggregation in Python configurable
e9a40f6 [Davies Liu] recursive merger
6edbd1f [Davies Liu] Hash based disk spilling aggregation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants