From 1ead01db37e214be60fef8f9c1056661e24ddd15 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Tue, 28 Apr 2020 21:23:19 +0800 Subject: [PATCH 1/5] init --- .../apache/spark/api/python/PythonRDD.scala | 17 ++++++ python/pyspark/rdd.py | 11 ++++ python/pyspark/tests/test_rdd.py | 57 +++++++++++++++++++ 3 files changed, 85 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 6dc1721f56adf..a97bf00ac434e 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -168,6 +168,23 @@ private[spark] object PythonRDD extends Logging { serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}") } + /** + * A helper function to collect an RDD as an iterator, then serve it via socket. + * This method is similar with `PythonRDD.collectAndServe`, but user can specify job group id, + * job description, and interruptOnCancel option. + * + * Note: This method are temporary, might be removed in future. + */ + def collectAndServeWithJobGroup[T]( + rdd: RDD[T], + groupId: String, + description: String, + interruptOnCancel: Boolean): Array[Any] = { + val sc = rdd.sparkContext + sc.setJobGroup(groupId, description, interruptOnCancel) + serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}") + } + /** * A helper function to create a local RDD iterator and serve it via socket. Partitions are * are collected as separate jobs, by order of index. Partition data is first requested by a diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index fbf645d10ee86..65de8b79a5ea1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -877,6 +877,17 @@ def collect(self): sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) return list(_load_from_socket(sock_info, self._jrdd_deserializer)) + def collectWithJobGroup(self, groupId, description, interruptOnCancel=False): + """ + When collect rdd, use this method to specify job group. + + .. note:: This method are temporary, might be removed in future. + """ + with SCCallSiteSync(self.context) as css: + sock_info = self.ctx._jvm.PythonRDD.collectAndServeWithJobGroup( + self._jrdd.rdd(), groupId, description, interruptOnCancel) + return list(_load_from_socket(sock_info, self._jrdd_deserializer)) + def reduce(self, f): """ Reduces the elements of this RDD using the specified commutative and diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py index 31c5a7510a165..5fffe8f697830 100644 --- a/python/pyspark/tests/test_rdd.py +++ b/python/pyspark/tests/test_rdd.py @@ -814,6 +814,63 @@ def assert_request_contents(exec_reqs, task_reqs): rddWithoutRp = self.sc.parallelize(range(10)) self.assertEqual(rddWithoutRp.getResourceProfile(), None) + def test_collect_with_job_group(self): + import time + import threading + + group_A_name = "group_A" + group_B_name = "group_B" + + def map_func(x): + time.sleep(3) + return x + 1 + + num_threads = 4 + thread_list = [] + # an array which record whether job is cancelled. + # the index of the array is the thread index which job run in. + is_job_cancelled = [False for x in range(num_threads)] + + def run_job(job_group, index): + try: + result = self.sc.parallelize([3]).map(map_func).collectWithJobGroup( + job_group, "test rdd collect with setting job group") + is_job_cancelled[index] = False + return result + except Exception as e: + is_job_cancelled[index] = True + return None + + def launch_job_thread(job_group, index): + thread = threading.Thread(target=run_job, args=(job_group, index)) + thread.start() + return thread + + # test job succeeded when not cancelled. + run_job(group_A_name, 0) + self.assertFalse(is_job_cancelled[0], "job didn't succeeded.") + + # launch spark job in multiple threads and cancel half of them. + for i in range(num_threads): + if i % 2 == 0: + thread = launch_job_thread(group_A_name, i) + else: + thread = launch_job_thread(group_B_name, i) + thread_list.append(thread) + + time.sleep(1) + self.sc.cancelJobGroup(group_A_name) + + for i in range(num_threads): + thread_list[i].join() + if i % 2 == 0: + # make sure group A job being cancelled. + self.assertTrue(is_job_cancelled[i], "Job in group A wasn't cancelled.") + else: + # make sure group B job succeeded. + self.assertFalse(is_job_cancelled[i], "Job in group B didn't succeeded.") + + if __name__ == "__main__": import unittest from pyspark.tests.test_rdd import * From 6f80066f5b2df7eb8d4f2c96207cace7981868bb Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Wed, 29 Apr 2020 09:12:22 +0800 Subject: [PATCH 2/5] update --- python/pyspark/rdd.py | 3 + python/pyspark/tests/test_rdd.py | 99 +++++++++++++++++--------------- 2 files changed, 55 insertions(+), 47 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 65de8b79a5ea1..4662caf2e60e9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -879,9 +879,12 @@ def collect(self): def collectWithJobGroup(self, groupId, description, interruptOnCancel=False): """ + .. note:: Experimental + When collect rdd, use this method to specify job group. .. note:: This method are temporary, might be removed in future. + .. versionadded:: 3.0.0 """ with SCCallSiteSync(self.context) as css: sock_info = self.ctx._jvm.PythonRDD.collectAndServeWithJobGroup( diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py index 5fffe8f697830..6d0fc4e73c435 100644 --- a/python/pyspark/tests/test_rdd.py +++ b/python/pyspark/tests/test_rdd.py @@ -814,61 +814,66 @@ def assert_request_contents(exec_reqs, task_reqs): rddWithoutRp = self.sc.parallelize(range(10)) self.assertEqual(rddWithoutRp.getResourceProfile(), None) - def test_collect_with_job_group(self): - import time + def test_multiple_group_jobs(self): import threading + group_a = "job_ids_to_cancel" + group_b = "job_ids_to_run" - group_A_name = "group_A" - group_B_name = "group_B" + threads = [] + thread_ids = range(4) + thread_ids_to_cancel = [i for i in thread_ids if i % 2 == 0] + thread_ids_to_run = [i for i in thread_ids if i % 2 != 0] - def map_func(x): - time.sleep(3) - return x + 1 - - num_threads = 4 - thread_list = [] - # an array which record whether job is cancelled. - # the index of the array is the thread index which job run in. - is_job_cancelled = [False for x in range(num_threads)] + # A list which records whether job is cancelled. + # The index of the array is the thread index which job run in. + is_job_cancelled = [False for _ in thread_ids] def run_job(job_group, index): + """ + Executes a job with the group ``job_group``. Each job waits for 3 seconds + and then exits. + """ try: - result = self.sc.parallelize([3]).map(map_func).collectWithJobGroup( - job_group, "test rdd collect with setting job group") + self.sc.setJobGroup(job_group, "test rdd collect with setting job group") + self.sc.parallelize([15]).map(lambda x: time.sleep(x)).collect() is_job_cancelled[index] = False - return result - except Exception as e: + except Exception: + # Assume that exception means job cancellation. is_job_cancelled[index] = True - return None - - def launch_job_thread(job_group, index): - thread = threading.Thread(target=run_job, args=(job_group, index)) - thread.start() - return thread - - # test job succeeded when not cancelled. - run_job(group_A_name, 0) - self.assertFalse(is_job_cancelled[0], "job didn't succeeded.") - - # launch spark job in multiple threads and cancel half of them. - for i in range(num_threads): - if i % 2 == 0: - thread = launch_job_thread(group_A_name, i) - else: - thread = launch_job_thread(group_B_name, i) - thread_list.append(thread) - - time.sleep(1) - self.sc.cancelJobGroup(group_A_name) - - for i in range(num_threads): - thread_list[i].join() - if i % 2 == 0: - # make sure group A job being cancelled. - self.assertTrue(is_job_cancelled[i], "Job in group A wasn't cancelled.") - else: - # make sure group B job succeeded. - self.assertFalse(is_job_cancelled[i], "Job in group B didn't succeeded.") + + # Test if job succeeded when not cancelled. + run_job(group_a, 0) + self.assertFalse(is_job_cancelled[0]) + + # Run jobs + for i in thread_ids_to_cancel: + t = threading.Thread(target=run_job, args=(group_a, i)) + t.start() + threads.append(t) + + for i in thread_ids_to_run: + t = threading.Thread(target=run_job, args=(group_b, i)) + t.start() + threads.append(t) + + # Wait to make sure all jobs are executed. + time.sleep(3) + # And then, cancel one job group. + self.sc.cancelJobGroup(group_a) + + # Wait until all threads launching jobs are finished. + for t in threads: + t.join() + + for i in thread_ids_to_cancel: + self.assertTrue( + is_job_cancelled[i], + "Thread {i}: Job in group A was not cancelled.".format(i=i)) + + for i in thread_ids_to_run: + self.assertFalse( + is_job_cancelled[i], + "Thread {i}: Job in group B did not succeeded.".format(i=i)) if __name__ == "__main__": From 481bba62ce62a13f23ce153e54e8a5f56f6059c2 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Wed, 29 Apr 2020 09:15:02 +0800 Subject: [PATCH 3/5] update --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 -- python/pyspark/rdd.py | 1 - 2 files changed, 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index a97bf00ac434e..a577194a48006 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -172,8 +172,6 @@ private[spark] object PythonRDD extends Logging { * A helper function to collect an RDD as an iterator, then serve it via socket. * This method is similar with `PythonRDD.collectAndServe`, but user can specify job group id, * job description, and interruptOnCancel option. - * - * Note: This method are temporary, might be removed in future. */ def collectAndServeWithJobGroup[T]( rdd: RDD[T], diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 4662caf2e60e9..d0ac000ba3208 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -883,7 +883,6 @@ def collectWithJobGroup(self, groupId, description, interruptOnCancel=False): When collect rdd, use this method to specify job group. - .. note:: This method are temporary, might be removed in future. .. versionadded:: 3.0.0 """ with SCCallSiteSync(self.context) as css: From 91cf1c6f48e95ae7344ec09399e5005be1f9d78f Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 30 Apr 2020 07:30:37 +0800 Subject: [PATCH 4/5] fix test --- python/pyspark/tests/test_rdd.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py index 6d0fc4e73c435..c300bce2ad5fb 100644 --- a/python/pyspark/tests/test_rdd.py +++ b/python/pyspark/tests/test_rdd.py @@ -835,7 +835,8 @@ def run_job(job_group, index): """ try: self.sc.setJobGroup(job_group, "test rdd collect with setting job group") - self.sc.parallelize([15]).map(lambda x: time.sleep(x)).collect() + self.sc.parallelize([15]).map(lambda x: time.sleep(x)).collect( + job_group, "test rdd collect with setting job group") is_job_cancelled[index] = False except Exception: # Assume that exception means job cancellation. From bdd77fecc886a7b94a66c8c3dfd27f8923c22015 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 30 Apr 2020 15:24:11 +0800 Subject: [PATCH 5/5] fix --- python/pyspark/tests/test_rdd.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py index c300bce2ad5fb..62ad4221d7078 100644 --- a/python/pyspark/tests/test_rdd.py +++ b/python/pyspark/tests/test_rdd.py @@ -834,9 +834,8 @@ def run_job(job_group, index): and then exits. """ try: - self.sc.setJobGroup(job_group, "test rdd collect with setting job group") - self.sc.parallelize([15]).map(lambda x: time.sleep(x)).collect( - job_group, "test rdd collect with setting job group") + self.sc.parallelize([15]).map(lambda x: time.sleep(x)) \ + .collectWithJobGroup(job_group, "test rdd collect with setting job group") is_job_cancelled[index] = False except Exception: # Assume that exception means job cancellation.