-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25021][K8S] Add spark.executor.pyspark.memory limit for K8S #22298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@rdblue @holdenk for review. This contains both unit and integration tests that verify [SPARK-25004] for K8S |
| .delete() | ||
| } | ||
|
|
||
| // TODO: [SPARK-25291] This test is flaky with regards to memory of executors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah This test periodically fails on setting proper memory for executors on this specific test. I have filed a JIRA: SPARK-25291
|
Kubernetes integration test starting |
|
Test build #95519 has finished for PR 22298 at commit
|
|
Kubernetes integration test status success |
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a quick first pass over this PR, really excited to have this support in K8s as well as yarn, but I have some questions especially around mixed language pipelines.
Really excited the K8s integration tests are now integrated.
| @@ -0,0 +1,47 @@ | |||
| # | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is examples the right place for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Easiest place to put to be trivially picked up by integration tests, as I did with pyfiles, but I am open to recommendations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be in python tests (and get it to run only certain cluster manager)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That might be a good place for it. But the reason for this being in examples is that the integration tests can access this locally and we can see the success in the Jenkins environment. The K8s integration test suite does not run python run-tests.py which means that this test would therefore not be part of the PRB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the concern here is shipping a test as an example - this is the place where dev will be looking for example on how to use pyspark and having a memory test there is a bit strange.
| "Ensure that major Python version is either Python2 or Python3") | ||
| .createWithDefault("2") | ||
|
|
||
| val APP_RESOURCE_TYPE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this instead of the bools? What about folks who want to make a pipeline which is both R and Python?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for this is because we are already running binding steps that configure the driver based on the app resource. I thought it might as well pass the config down into the executors upon doing that binding bootstrap step.
Currently, we don't have any docker files that handle mixed pipelines so such configurations should be addressed in a followup-PR, imo. But I am open to suggestions (that are testable).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah lets do something in a follow up after 2.4
| .get(DRIVER_MEMORY_OVERHEAD) | ||
| .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, | ||
| MEMORY_OVERHEAD_MIN_MIB)) | ||
| // TODO: Have memory limit checks on driverMemory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we file a JIRA as well and include it in the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to get an opinion from people (@mccheah) into whether or not we wanted to let the K8S api to handle memory limits (via ResourceQuota limit errors) or whether we wanted to catch it in a Spark exception (if we were to include a configuration for memory limits)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm can you elaborate here? We already set the driver memory limit in this step based on the overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valid point. These are not necessary
| (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, | ||
| MEMORY_OVERHEAD_MIN_MIB)) | ||
| private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB | ||
| // TODO: Have memory limit checks on executorMemory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as other TODO
| // TODO: Have memory limit checks on executorMemory | ||
| private val executorMemoryTotal = kubernetesConf.sparkConf | ||
| .getOption(APP_RESOURCE_TYPE.key).map{ res => | ||
| val additionalPySparkMemory = if (res == "python") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this means that we couldn't turn this on in a mixed language pipeline even if the pipeline author did some fun hacks (since we might need to support Scala/Python/R all at the same time). Is there a reason we did this instead of the YARN approach of isPython ?
Also I'd write this as a case statement instead perhaps style wise, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if there is a mixed pipeline, the binding steps would need to be reconfigured to include mixed pipelines. But currently, language is only determined based on the appResource
|
|
||
| RUN apk add --no-cache R R-dev | ||
|
|
||
| COPY R ${SPARK_HOME}/R |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes the docker build not run the R and R-dev installations each time an update to the jar is made. This is a minor change that helps with dev :)
| # Removed the .cache to save space | ||
| rm -r /root/.cache | ||
|
|
||
| COPY python/lib ${SPARK_HOME}/python/lib |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, is this change intentional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the R change above ^^
|
|
||
| test("Run PySpark with memory customization", k8sTestTag) { | ||
| sparkAppConf | ||
| .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Mildly confused why there's so many sets here (like the image, etc.) maybe make more sense in a shared test setup func?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of this stuff can be factored out I think, we just haven't done so yet. I wouldn't block a merge of this on such a refactor, but this entire test class could probably use some cleanup with respect to how the code is structured.
| .set("spark.kubernetes.pyspark.pythonVersion", "3") | ||
| .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant") | ||
| .set("spark.executor.pyspark.memory", s"${additionalMemory}m") | ||
| .set("spark.python.worker.reuse", "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is worker reuse being set to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in reference to #21977 (comment)
| sparkAppConf | ||
| .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") | ||
| .set("spark.kubernetes.pyspark.pythonVersion", "3") | ||
| .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect people who configure the rlimit advanced feature to also set the memoryOverheadConstant to a different value? If so we should call it out in the docs. (note: I think it would make sense for folks to set this to a lower value so I think this would be the expected behaviour and we should document but open to suggestions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add to docs, sure!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already included in the docs of docs/running-on-kubernetes.md
|
Kubernetes integration test starting |
|
Test build #95564 has finished for PR 22298 at commit
|
|
Kubernetes integration test status success |
|
Looks fine to me, but I'm not familiar enough with the K8S code to have much of an opinion. |
|
Test build #95570 has finished for PR 22298 at commit
|
|
https://issues.apache.org/jira/browse/SPARK-25291 looks like a real issue with the way the tests are written. So we don't necessarily want to ignore it for this patch, but we're still thinking about it. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
@felixcheung @holdenk I have moved the PySpark example files to a more appropriate location. Any other comments before merge? |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #95686 has finished for PR 22298 at commit
|
|
Yeah this looks ok to me, would like a +1 from @felixcheung and @holdenk. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two minor nits, I think we should get rid of the set to false but otherwise LGTM since K8s folks have already signed off on that part.
| SparkPod(pod.pod, withDriverArgs) | ||
| } | ||
| override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty | ||
| override def getAdditionalPodSystemProperties(): Map[String, String] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filed SPARK-25373 - Support mixed language pipelines on Spark on K8s
| .set("spark.kubernetes.pyspark.pythonVersion", "3") | ||
| .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant") | ||
| .set("spark.executor.pyspark.memory", s"${additionalMemory}m") | ||
| .set("spark.python.worker.reuse", "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe this should be set. Worker reuse is on by default in most systems so not sure if this test depends on worker reuse being false. As per @rdblue's investigation this shouldn't impact this code path (and if it does we need to re-open that investigation).
|
Kubernetes integration test starting |
|
LGTM pending jenkins, see previous comments for details. |
|
Kubernetes integration test status success |
|
Test build #95809 has finished for PR 22298 at commit
|
|
Test build #95813 has finished for PR 22298 at commit
|
|
Merged to master (e.g. 3). It's not a bug fix but I think we should consider this for backport to 2.4 since it's arguably the second half of a feature that's in 2.4, but it's doesn't backport cleanly as is so maybe another PR just for the 2.4 branch. |
|
+1 for 2.4 |
What changes were proposed in this pull request?
Add spark.executor.pyspark.memory limit for K8S
How was this patch tested?
Unit and Integration tests