-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25004][CORE] Add spark.executor.pyspark.memory limit. #21977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@holdenk, can you help review this since it is related to PySpark? |
|
cc @ueshin |
This comment has been minimized.
This comment has been minimized.
19cd9c5 to
8846bdc
Compare
This comment has been minimized.
This comment has been minimized.
8846bdc to
3992cb2
Compare
This comment has been minimized.
This comment has been minimized.
8f993ba to
121d3b5
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
121d3b5 to
c072dcb
Compare
This comment has been minimized.
This comment has been minimized.
c072dcb to
0d6deaf
Compare
This comment has been minimized.
This comment has been minimized.
0d6deaf to
a70720b
Compare
|
I'd be happy to. I've got a live review tomorrow I'll take a look at this tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should add pysparkWorkerMemory here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just switch it to use the total $executorMem instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like having it broken out so users can see where their allocation is going. Otherwise, users that only know about spark.executor.memory might not know how their allocation is 1gb higher when running PySpark. I've updated this to include the worker memory.
python/pyspark/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forget to output msg here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
Does this have applications in the other cluster managers that are considering overhead memory, like Kuberrnetes and Mesos? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny nit: indentation ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
This comment has been minimized.
This comment has been minimized.
|
cc @BryanCutler and @icexelloss too since we recently discussed about memory issue. |
|
This seems very applicable to add to Kubernetes as well. We already increased the DEFAULT_MEMORY_OVERHEAD to account for memory issues that arise with users forgetting to increase the memory overhead. Could this be expanded for that cluster manager as well, I'd be happy to help with appending to this PR (or in a followup) to include that. |
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for this PR it's really exciting to see a possible solution for one of the largest long lasting problems in PySpark!
I have a few questions for clarification and I'd really love to see the test suite, even if its not something for Jenkins usage, included somehow so that it can be part of the release verification.
I know we've mode some good progress on having integration tests in K8s so maybe we could have some good integration testing there eventually.
I'd also love @BryanCutler's feedback on anything in Arrow -- I don't think this would impact it but I am curious how rlimits interplay with native allocations that things like Arrow might do (I think?) (I'll do some more reading).
This is only a first pass with less than a full cup-of-coffee in me, so I'll come back and do a more thorough read through but once again really really excited to see this and hoping we can find a way to get this in for Spark 2.4.
python/pyspark/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the logic of this block appears to be the user has requested a memory limit and Python does not have a memory limit set. If the user has requested a different memory limit than the one set though, regardless of if there is a current memory limit, would it make sense to set?
Also possible I've misunderstood the rlmit return values here.
That being said even if that is the behaviour we want, should we use resource.RLIM_INFINITY to check if its unlimited?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated to use resource.RLIM_INFINITY.
I think this should only set the resource limit if it isn't already set. It is unlikely that it's already set because this is during worker initialization, but the intent is to not cause harm if a higher-level system (i.e. container provider) has already set the limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. What about if we only set the limit if it was lower than the current limit? (e.g. I could see a container system setting a limit based on an assumption which doesn't hold once Spark is in the mix and if we come up with a lower limit we could apply it)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me. I'll update this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just switch it to use the total $executorMem instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is minor, but this code block is repeated, would it make sense to factor out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other configuration options are already duplicated, so I was trying to make as few changes as possible.
Since there are several duplicated options, I think it makes more sense to pass the SparkConf through to PythonRunner so it can extract its own configuration.
@holdenk, would you like this refactor done in this PR, or should I do it in a follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead with the refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same repeated code block as mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been awhile since I spent a lot of time thinking about how we launch our python worker processes. Maybe it would make sense to add a comment here explaining the logic a bit more? Based on the documentation in PythonWorkerFactory it appears we do the fork/not-fork decision not based on if reuseworker is set but instead on if we're in Windows or not. Is that the logic that this block was attempting to handle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the comments below were clear: if a single worker is reused, it gets the entire allocation. If each core starts its own worker, each one gets an equal share.
If reuseWorker is actually ignored, then this needs to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there might be a misunderstanding on what reuseWorker means perhaps. The workers will be reused but the decision on if we fork in Python or not is based on if we are in Windows or not. How about we both go and read the code path there and see if we reach the same understanding? I could be off too.
|
@ifilonenko, I opened follow-up SPARK-25021 for adding the PySpark memory allocation to Kubernetes. @mccheah, I opened follow-up SPARK-25022 for Mesos. |
|
@holdenk, I attempted to write a YARN unit test for this, but evidently the MiniYARNCluster doesn't run python workers. The task is run, but a worker is never started. If you have any idea how to fix this, I think we could have an easy test. Here's what I have so far: https://gist.github.com/rdblue/9848a00f49eaad6126fbbcfa1b039e19 |
This comment has been minimized.
This comment has been minimized.
|
@rdblue I'll take a look at that test on Monday during one of my streams after I get something else done :) |
68e9141 to
f427120
Compare
This comment has been minimized.
This comment has been minimized.
|
Test build #95313 has finished for PR 21977 at commit
|
| // each python worker gets an equal part of the allocation. the worker pool will grow to the | ||
| // number of concurrent tasks, which is determined by the number of cores in this executor. | ||
| private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) | ||
| .map(_ / conf.getInt("spark.executor.cores", 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, I fixed the site to refer databricks's guide. mind fixing this one if there are more changes to be pushed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thanks for taking the time to clarify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon, sorry but it looks like this was merged before I could push a commit to update it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it's fine. I meant to fix them together if there are more changes to push. Not a big deal.
|
The last couple of commits have failed a test case, but there have been no code changes since a passing test. I think master is just a bit flaky right now and that this PR is fine. |
|
Test failure seems completely unrelated. Merging to master. |
|
@vanzin, thanks for merging! And thanks to everyone for the reviews! |
|
Sorry for a late input like this. I totally forgot about it and suddenly I recalled it. Have you guys taken a look for I think we should consolidate both into one configuration, or at least setting |
|
@HyukjinKwon, I haven't looked at Looks like this limit controls when data is spilled to disk. Do you know what data is spilled and what is accumulating in the python worker? My understanding was that python processed groups of rows (either pickled or in Arrow format) and doesn't typically hold data like the executor JVM does. More information here would be helpful to know what the right way to set this is. |
|
I think the actual data is spilled into disk on Python RDD APIs during shuffle and/or aggregation. For instance, Up to my knowledge, this configuration does not apply to SQL or Arrow related APIs in Python but only RDD APIs. During aggregation and/or batch processing, it looks inevitable to hold the groups in memory and it might exceed the memory limit. In this case, it spills into disk as configured this I think basically we should just use |
|
BTW, I don't think many people use I think we can just remove and replace it to |
|
@HyukjinKwon, I like the idea of using The problem is that the first setting controls the total size of the address space and the second is a threshold that will cause data to be spilled. If the threshold for spilling is the total size limit, then Python would run out of memory before it started spilling data. I think it makes sense to have both settings. The JVM has executor memory and spark memory (controlled by I think that means the spill setting should have a better name and should be limited by the total memory. Maybe ensure it's max is I think we should avoid introducing a property like |
|
Yea, I was thinking in that way and that works to me too. Would you be willing to work on the follow-up? |
|
Btw, let's clarify the practical case when both configurations can be set differently if we're going to keep two separate configurations in the JIRA or PR. |
Oh, BTW, I don't think it's true since it already uses less memory then what it sets. Lines 1761 to 1762 in c2d0d70
Line 626 in c2d0d70
Line 654 in c2d0d70
|
|
hey do you guys want to capture this into a JIRA and broadcast to dev@ for visibility? |
|
@felixcheung, good idea. I opened https://issues.apache.org/jira/browse/SPARK-26679 for this. I'm not sure that I'll have time to work on it since I'm working on some DSv2 pull requests. Others should feel free to claim it now and I can review. If no one claims it by the time I can work on it, I'll take it on. |
|
Thanks all, I'll move to the JIRA. |
|
FWIW, I hope all reviewers here put some input for the JIRA. It's confusing to have both configurations and I think we should fix. |
| <td> | ||
| The amount of memory to be allocated to PySpark in each executor, in MiB | ||
| unless otherwise specified. If set, PySpark memory for an executor will be | ||
| limited to this amount. If not set, Spark will not limit Python's memory use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, which OS did you test?
I doesn't work in my case in non-yarn (local mode) at my Mac and I suspect it's OS-specific.
$ ./bin/pyspark --conf spark.executor.pyspark.memory=1mdef ff(iter):
def get_used_memory():
import psutil
process = psutil.Process(os.getpid())
info = process.memory_info()
return info.rss
import numpy
a = numpy.arange(1024 * 1024 * 1024, dtype="u8")
return [get_used_memory()]
sc.parallelize([], 1).mapPartitions(ff).collect()def ff(_):
import sys, numpy
a = numpy.arange(1024 * 1024 * 1024, dtype="u8")
return [sys.getsizeof(a)]
sc.parallelize([], 1).mapPartitions(ff).collect()Can you clarify how you tested in the PR description?
FYI,
My Mac:
>>> import resource
>>> size = 50 * 1024 * 1024
>>> resource.setrlimit(resource.RLIMIT_AS, (size, size))
>>> a = 'a' * sizeat CentOS Linux release 7.5.1804 (Core):
>>> import resource
>>> size = 50 * 1024 * 1024
>>> resource.setrlimit(resource.RLIMIT_AS, (size, size))
>>> a = 'a' * size
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
MemoryErrorLooks we should better note this for clarification. For instance, we could just document that this feature is dependent on Python's resource module. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds fine to me. I tested in a linux environment.
…set via 'spark.executor.pyspark.memory' ## What changes were proposed in this pull request? #21977 added a feature to limit Python worker resource limit. This PR is kind of a followup of it. It proposes to add a test that checks the actual resource limit set by 'spark.executor.pyspark.memory'. ## How was this patch tested? Unit tests were added. Closes #23663 from HyukjinKwon/test_rlimit. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…ndent on 'resource' ## What changes were proposed in this pull request? This PR adds a note that explicitly `spark.executor.pyspark.memory` is dependent on resource module's behaviours at Python memory usage. For instance, I at least see some difference at #21977 (comment) ## How was this patch tested? Manually built the doc. Closes #23664 from HyukjinKwon/note-resource-dependent. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…set via 'spark.executor.pyspark.memory' ## What changes were proposed in this pull request? apache#21977 added a feature to limit Python worker resource limit. This PR is kind of a followup of it. It proposes to add a test that checks the actual resource limit set by 'spark.executor.pyspark.memory'. ## How was this patch tested? Unit tests were added. Closes apache#23663 from HyukjinKwon/test_rlimit. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…ndent on 'resource' ## What changes were proposed in this pull request? This PR adds a note that explicitly `spark.executor.pyspark.memory` is dependent on resource module's behaviours at Python memory usage. For instance, I at least see some difference at apache#21977 (comment) ## How was this patch tested? Manually built the doc. Closes apache#23664 from HyukjinKwon/note-resource-dependent. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
This adds `spark.executor.pyspark.memory` to configure Python's address space limit, [`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS). Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory: ``` File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in fe_engineer fe_eval_rec.update(f(src_rec_prep, mat_rec_prep)) File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, []), mat_rec_prep.get(item, [])) File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in leven_list_compare permutations = sorted(permutations, reverse=True) MemoryError ``` The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity. Tested memory limits in our YARN cluster and verified that MemoryError is thrown. Author: Ryan Blue <[email protected]> Closes apache#21977 from rdblue/SPARK-25004-add-python-memory-limit. (cherry picked from commit 7ad18ee) Conflicts: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala python/pyspark/worker.py sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
This adds `spark.executor.pyspark.memory` to configure Python's address space limit, [`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS). Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory: ``` File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in fe_engineer fe_eval_rec.update(f(src_rec_prep, mat_rec_prep)) File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, []), mat_rec_prep.get(item, [])) File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in leven_list_compare permutations = sorted(permutations, reverse=True) MemoryError ``` The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity. Tested memory limits in our YARN cluster and verified that MemoryError is thrown. Author: Ryan Blue <[email protected]> Closes apache#21977 from rdblue/SPARK-25004-add-python-memory-limit. (cherry picked from commit 7ad18ee) Conflicts: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala python/pyspark/worker.py sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
What changes were proposed in this pull request?
This adds
spark.executor.pyspark.memoryto configure Python's address space limit,resource.RLIMIT_AS. Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory:The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity.
How was this patch tested?
Tested memory limits in our YARN cluster and verified that MemoryError is thrown.