From 2d3315a7dab429abc4d9ef5ed7f8f5484e8421f1 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 16 Nov 2018 09:46:31 +0800 Subject: [PATCH 1/6] Disable 'spark.executor.pyspark.memory' on Windows always --- .../spark/api/python/PythonRunner.scala | 7 ++++++- docs/configuration.md | 2 +- python/pyspark/worker.py | 20 ++++++++++++------- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index f73e95eac8f79..fb42fe4f1c711 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + private val memoryMb = if (Utils.isWindows) { + // Windows currently does not have 'resource' Python module that is required in worker.py + None + } else { + conf.get(PYSPARK_EXECUTOR_MEMORY) .map(_ / conf.getInt("spark.executor.cores", 1)) + } // All the Python functions should have the same exec, version and envvars. protected val envVars = funcs.head.funcs.head.envVars diff --git a/docs/configuration.md b/docs/configuration.md index 2915fb5fa9197..10403429e50b4 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -189,7 +189,7 @@ of the most common options to set are: limited to this amount. If not set, Spark will not limit Python's memory use and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory - is added to executor resource requests. + is added to executor resource requests. This configuration is not supported on Windows. diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8c59f1f999f18..3e458bc2e8e39 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -22,7 +22,11 @@ import os import sys import time -import resource +# 'resource' is a Unix specific package. +try: + import resource +except ImportError: + pass import socket import traceback @@ -268,9 +272,11 @@ def main(infile, outfile): # set up memory limits memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) - total_memory = resource.RLIMIT_AS - try: - if memory_limit_mb > 0: + # 'PYSPARK_EXECUTOR_MEMORY_MB' should be undefined on Windows because it depends on + # resource package which is a Unix specific package. + if memory_limit_mb > 0: + total_memory = resource.RLIMIT_AS + try: (soft_limit, hard_limit) = resource.getrlimit(total_memory) msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit) print(msg, file=sys.stderr) @@ -283,9 +289,9 @@ def main(infile, outfile): print(msg, file=sys.stderr) resource.setrlimit(total_memory, (new_limit, new_limit)) - except (resource.error, OSError, ValueError) as e: - # not all systems support resource limits, so warn instead of failing - print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr) + except (resource.error, OSError, ValueError) as e: + # not all systems support resource limits, so warn instead of failing + print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr) # initialize global state taskContext = None From 52a91cc887462227caf65eb85c0f01d5e8fd0485 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 17 Nov 2018 14:41:17 +0800 Subject: [PATCH 2/6] Add flag for resource package --- python/pyspark/worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 3e458bc2e8e39..8cc828e368986 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -23,10 +23,11 @@ import sys import time # 'resource' is a Unix specific package. +has_resource_package = True try: import resource except ImportError: - pass + has_resource_package = False import socket import traceback @@ -274,7 +275,7 @@ def main(infile, outfile): memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) # 'PYSPARK_EXECUTOR_MEMORY_MB' should be undefined on Windows because it depends on # resource package which is a Unix specific package. - if memory_limit_mb > 0: + if memory_limit_mb > 0 and has_resource_package: total_memory = resource.RLIMIT_AS try: (soft_limit, hard_limit) = resource.getrlimit(total_memory) From fd92a4e1cee9f666d7ee6f9c9fcb45367c8132a8 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 22 Nov 2018 07:35:01 +0800 Subject: [PATCH 3/6] Address comments --- docs/configuration.md | 4 +++- python/pyspark/worker.py | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 10403429e50b4..0b67101003c16 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -189,7 +189,9 @@ of the most common options to set are: limited to this amount. If not set, Spark will not limit Python's memory use and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory - is added to executor resource requests. This configuration is not supported on Windows. + is added to executor resource requests. + + NOTE: This configuration is not supported on Windows. diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8cc828e368986..7dc06df9d70df 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -22,12 +22,12 @@ import os import sys import time -# 'resource' is a Unix specific package. -has_resource_package = True +# 'resource' is a Unix specific module. +has_resource_module = True try: import resource except ImportError: - has_resource_package = False + has_resource_module = False import socket import traceback @@ -274,8 +274,8 @@ def main(infile, outfile): # set up memory limits memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) # 'PYSPARK_EXECUTOR_MEMORY_MB' should be undefined on Windows because it depends on - # resource package which is a Unix specific package. - if memory_limit_mb > 0 and has_resource_package: + # resource module which is a Unix specific module. + if memory_limit_mb > 0 and has_resource_module: total_memory = resource.RLIMIT_AS try: (soft_limit, hard_limit) = resource.getrlimit(total_memory) From 741d64a764fbb5d1b4d10b9799e8aa4309ad4028 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 30 Nov 2018 10:12:44 +0800 Subject: [PATCH 4/6] Remove JVM side check --- .../scala/org/apache/spark/api/python/PythonRunner.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index fb42fe4f1c711..f73e95eac8f79 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -74,13 +74,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = if (Utils.isWindows) { - // Windows currently does not have 'resource' Python module that is required in worker.py - None - } else { - conf.get(PYSPARK_EXECUTOR_MEMORY) + private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) .map(_ / conf.getInt("spark.executor.cores", 1)) - } // All the Python functions should have the same exec, version and envvars. protected val envVars = funcs.head.funcs.head.envVars From bd1acf22dd447ce5dc6091ba6fc9a44cd063fae0 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 30 Nov 2018 10:37:59 +0800 Subject: [PATCH 5/6] Fix comment --- python/pyspark/worker.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 7dc06df9d70df..953b468e96519 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -273,8 +273,6 @@ def main(infile, outfile): # set up memory limits memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) - # 'PYSPARK_EXECUTOR_MEMORY_MB' should be undefined on Windows because it depends on - # resource module which is a Unix specific module. if memory_limit_mb > 0 and has_resource_module: total_memory = resource.RLIMIT_AS try: From 5fe1c0976acaaca444d98ce0dff82eb92d82076a Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Sat, 1 Dec 2018 10:24:42 +0800 Subject: [PATCH 6/6] Improve doc --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 0b67101003c16..4337f4f517c35 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -191,7 +191,7 @@ of the most common options to set are: shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. - NOTE: This configuration is not supported on Windows. + NOTE: Python memory usage may not be limited on platforms that do not support resource limiting, such as Windows.