From 44c622bf17ab642ef372d9a534b5bfc18c98a0da Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 3 Dec 2018 16:02:35 +0800 Subject: [PATCH 1/4] Add support to run specific unittests and/or doctests in python/run-tests script --- python/run-tests-with-coverage | 2 - python/run-tests.py | 86 ++++++++++++++++++++++------------ 2 files changed, 56 insertions(+), 32 deletions(-) diff --git a/python/run-tests-with-coverage b/python/run-tests-with-coverage index 6d74b563e9140..457821037d43c 100755 --- a/python/run-tests-with-coverage +++ b/python/run-tests-with-coverage @@ -50,8 +50,6 @@ export SPARK_CONF_DIR="$COVERAGE_DIR/conf" # This environment variable enables the coverage. export COVERAGE_PROCESS_START="$FWDIR/.coveragerc" -# If you'd like to run a specific unittest class, you could do such as -# SPARK_TESTING=1 ../bin/pyspark pyspark.sql.tests VectorizedUDFTests ./run-tests "$@" # Don't run coverage for the coverage command itself diff --git a/python/run-tests.py b/python/run-tests.py index 01a6e81264dd6..e47edd1e4badd 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -19,7 +19,7 @@ from __future__ import print_function import logging -from optparse import OptionParser +from optparse import OptionParser, OptionGroup import os import re import shutil @@ -93,17 +93,18 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): "pyspark-shell" ] env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args) - - LOGGER.info("Starting test(%s): %s", pyspark_python, test_name) + str_test_name = " ".join(test_name) + LOGGER.info("Starting test(%s): %s", pyspark_python, str_test_name) start_time = time.time() try: per_test_output = tempfile.TemporaryFile() retcode = subprocess.Popen( - [os.path.join(SPARK_HOME, "bin/pyspark"), test_name], + (os.path.join(SPARK_HOME, "bin/pyspark"), ) + test_name, stderr=per_test_output, stdout=per_test_output, env=env).wait() shutil.rmtree(tmp_dir, ignore_errors=True) except: - LOGGER.exception("Got exception while running %s with %s", test_name, pyspark_python) + LOGGER.exception( + "Got exception while running %s with %s", str_test_name, pyspark_python) # Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if # this code is invoked from a thread other than the main thread. os._exit(1) @@ -124,7 +125,8 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): except: LOGGER.exception("Got an exception while trying to print failed test output") finally: - print_red("\nHad test failures in %s with %s; see logs." % (test_name, pyspark_python)) + print_red("\nHad test failures in %s with %s; see logs." % ( + str_test_name, pyspark_python)) # Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if # this code is invoked from a thread other than the main thread. os._exit(-1) @@ -140,7 +142,7 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): decoded_lines)) skipped_counts = len(skipped_tests) if skipped_counts > 0: - key = (pyspark_python, test_name) + key = (pyspark_python, str_test_name) SKIPPED_TESTS[key] = skipped_tests per_test_output.close() except: @@ -152,11 +154,11 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): os._exit(-1) if skipped_counts != 0: LOGGER.info( - "Finished test(%s): %s (%is) ... %s tests were skipped", pyspark_python, test_name, - duration, skipped_counts) + "Finished test(%s): %s (%is) ... %s tests were skipped", pyspark_python, + str_test_name, duration, skipped_counts) else: LOGGER.info( - "Finished test(%s): %s (%is)", pyspark_python, test_name, duration) + "Finished test(%s): %s (%is)", pyspark_python, str_test_name, duration) def get_default_python_executables(): @@ -190,6 +192,20 @@ def parse_opts(): help="Enable additional debug logging" ) + group = OptionGroup(parser, "Developer Options") + group.add_option( + "--testnames", type="string", + default=None, + help=( + "A comma-separated list of specific modules, classes and functions of doctest " + "or unittest to test. " + "For example, 'pyspark.sql.foo' to run the module as unittests or doctests, " + "'pyspark.sql.tests FooTests' to run the specific class of unittests, " + "'pyspark.sql.tests FooTests.test_foo' to run the specific unittest in the class. " + "'--modules' option is ignored if they are given.") + ) + parser.add_option_group(group) + (opts, args) = parser.parse_args() if args: parser.error("Unsupported arguments: %s" % ' '.join(args)) @@ -213,25 +229,31 @@ def _check_coverage(python_exec): def main(): opts = parse_opts() - if (opts.verbose): + if opts.verbose: log_level = logging.DEBUG else: log_level = logging.INFO + should_test_modules = opts.testnames is None logging.basicConfig(stream=sys.stdout, level=log_level, format="%(message)s") LOGGER.info("Running PySpark tests. Output is in %s", LOG_FILE) if os.path.exists(LOG_FILE): os.remove(LOG_FILE) python_execs = opts.python_executables.split(',') - modules_to_test = [] - for module_name in opts.modules.split(','): - if module_name in python_modules: - modules_to_test.append(python_modules[module_name]) - else: - print("Error: unrecognized module '%s'. Supported modules: %s" % - (module_name, ", ".join(python_modules))) - sys.exit(-1) LOGGER.info("Will test against the following Python executables: %s", python_execs) - LOGGER.info("Will test the following Python modules: %s", [x.name for x in modules_to_test]) + + if should_test_modules: + modules_to_test = [] + for module_name in opts.modules.split(','): + if module_name in python_modules: + modules_to_test.append(python_modules[module_name]) + else: + print("Error: unrecognized module '%s'. Supported modules: %s" % + (module_name, ", ".join(python_modules))) + sys.exit(-1) + LOGGER.info("Will test the following Python modules: %s", [x.name for x in modules_to_test]) + else: + testnames_to_test = opts.testnames.split(',') + LOGGER.info("Will test the following Python tests: %s", testnames_to_test) task_queue = Queue.PriorityQueue() for python_exec in python_execs: @@ -246,16 +268,20 @@ def main(): LOGGER.debug("%s python_implementation is %s", python_exec, python_implementation) LOGGER.debug("%s version is: %s", python_exec, subprocess_check_output( [python_exec, "--version"], stderr=subprocess.STDOUT, universal_newlines=True).strip()) - for module in modules_to_test: - if python_implementation not in module.blacklisted_python_implementations: - for test_goal in module.python_test_goals: - heavy_tests = ['pyspark.streaming.tests', 'pyspark.mllib.tests', - 'pyspark.tests', 'pyspark.sql.tests', 'pyspark.ml.tests'] - if any(map(lambda prefix: test_goal.startswith(prefix), heavy_tests)): - priority = 0 - else: - priority = 100 - task_queue.put((priority, (python_exec, test_goal))) + if should_test_modules: + for module in modules_to_test: + if python_implementation not in module.blacklisted_python_implementations: + for test_goal in module.python_test_goals: + heavy_tests = ['pyspark.streaming.tests', 'pyspark.mllib.tests', + 'pyspark.tests', 'pyspark.sql.tests', 'pyspark.ml.tests'] + if any(map(lambda prefix: test_goal.startswith(prefix), heavy_tests)): + priority = 0 + else: + priority = 100 + task_queue.put((priority, (python_exec, (test_goal, )))) + else: + for test_goal in testnames_to_test: + task_queue.put((0, (python_exec, tuple(test_goal.split())))) # Create the target directory before starting tasks to avoid races. target_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'target')) From 8d3a5c563ab2029944e824ce15f7a09a44ce82ca Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 5 Dec 2018 10:40:41 +0800 Subject: [PATCH 2/4] Address comments --- python/run-tests.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/python/run-tests.py b/python/run-tests.py index e47edd1e4badd..88b9209712c3f 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -93,18 +93,17 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): "pyspark-shell" ] env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args) - str_test_name = " ".join(test_name) - LOGGER.info("Starting test(%s): %s", pyspark_python, str_test_name) + LOGGER.info("Starting test(%s): %s", pyspark_python, test_name) start_time = time.time() try: per_test_output = tempfile.TemporaryFile() retcode = subprocess.Popen( - (os.path.join(SPARK_HOME, "bin/pyspark"), ) + test_name, + [os.path.join(SPARK_HOME, "bin/pyspark")] + test_name.split(), stderr=per_test_output, stdout=per_test_output, env=env).wait() shutil.rmtree(tmp_dir, ignore_errors=True) except: LOGGER.exception( - "Got exception while running %s with %s", str_test_name, pyspark_python) + "Got exception while running %s with %s", test_name, pyspark_python) # Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if # this code is invoked from a thread other than the main thread. os._exit(1) @@ -126,7 +125,7 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): LOGGER.exception("Got an exception while trying to print failed test output") finally: print_red("\nHad test failures in %s with %s; see logs." % ( - str_test_name, pyspark_python)) + test_name, pyspark_python)) # Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if # this code is invoked from a thread other than the main thread. os._exit(-1) @@ -142,7 +141,7 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): decoded_lines)) skipped_counts = len(skipped_tests) if skipped_counts > 0: - key = (pyspark_python, str_test_name) + key = (pyspark_python, test_name) SKIPPED_TESTS[key] = skipped_tests per_test_output.close() except: @@ -155,10 +154,10 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): if skipped_counts != 0: LOGGER.info( "Finished test(%s): %s (%is) ... %s tests were skipped", pyspark_python, - str_test_name, duration, skipped_counts) + test_name, duration, skipped_counts) else: LOGGER.info( - "Finished test(%s): %s (%is)", pyspark_python, str_test_name, duration) + "Finished test(%s): %s (%is)", pyspark_python, test_name, duration) def get_default_python_executables(): @@ -278,10 +277,10 @@ def main(): priority = 0 else: priority = 100 - task_queue.put((priority, (python_exec, (test_goal, )))) + task_queue.put((priority, (python_exec, test_goal))) else: for test_goal in testnames_to_test: - task_queue.put((0, (python_exec, tuple(test_goal.split())))) + task_queue.put((0, (python_exec, test_goal))) # Create the target directory before starting tasks to avoid races. target_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'target')) From 643cafc9ffa9b316152d2629e94a30d50100d1cd Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 5 Dec 2018 10:43:05 +0800 Subject: [PATCH 3/4] Reduce diff --- python/run-tests.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/run-tests.py b/python/run-tests.py index 88b9209712c3f..a308560873c4f 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -92,6 +92,7 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): "--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir), "pyspark-shell" ] + env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args) LOGGER.info("Starting test(%s): %s", pyspark_python, test_name) start_time = time.time() @@ -102,8 +103,7 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): stderr=per_test_output, stdout=per_test_output, env=env).wait() shutil.rmtree(tmp_dir, ignore_errors=True) except: - LOGGER.exception( - "Got exception while running %s with %s", test_name, pyspark_python) + LOGGER.exception("Got exception while running %s with %s", test_name, pyspark_python) # Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if # this code is invoked from a thread other than the main thread. os._exit(1) @@ -124,8 +124,7 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): except: LOGGER.exception("Got an exception while trying to print failed test output") finally: - print_red("\nHad test failures in %s with %s; see logs." % ( - test_name, pyspark_python)) + print_red("\nHad test failures in %s with %s; see logs." % (test_name, pyspark_python)) # Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if # this code is invoked from a thread other than the main thread. os._exit(-1) @@ -153,8 +152,8 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): os._exit(-1) if skipped_counts != 0: LOGGER.info( - "Finished test(%s): %s (%is) ... %s tests were skipped", pyspark_python, - test_name, duration, skipped_counts) + "Finished test(%s): %s (%is) ... %s tests were skipped", pyspark_python, test_name, + duration, skipped_counts) else: LOGGER.info( "Finished test(%s): %s (%is)", pyspark_python, test_name, duration) From bd23e01078deb90bcdba654ff82047603a462b2e Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 5 Dec 2018 10:44:10 +0800 Subject: [PATCH 4/4] Newline --- python/run-tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/run-tests.py b/python/run-tests.py index a308560873c4f..e45268c13769a 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -92,8 +92,8 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): "--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir), "pyspark-shell" ] - env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args) + LOGGER.info("Starting test(%s): %s", pyspark_python, test_name) start_time = time.time() try: