Skip to content

Commit 75c7726

Browse files
YikunHyukjinKwon
authored andcommitted
[SPARK-37498][PYTHON] Add eventually for test_reuse_worker_of_parallelize_range
Add eventually for test_reuse_worker_of_parallelize_range Avoid test_reuse_worker_of_parallelize_range becoming flaky when resources are tight or some other reason No UT passed. Closes #35228 from Yikun/SPARK-37498. Authored-by: Yikun Jiang <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 732477b) Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 3711a81 commit 75c7726

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

python/pyspark/tests/test_worker.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from py4j.protocol import Py4JJavaError
3131

3232
from pyspark import SparkConf, SparkContext
33-
from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest
33+
from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest, eventually
3434

3535

3636
class WorkerTests(ReusedPySparkTestCase):
@@ -181,11 +181,15 @@ def f():
181181
class WorkerReuseTest(PySparkTestCase):
182182

183183
def test_reuse_worker_of_parallelize_range(self):
184-
rdd = self.sc.parallelize(range(20), 8)
185-
previous_pids = rdd.map(lambda x: os.getpid()).collect()
186-
current_pids = rdd.map(lambda x: os.getpid()).collect()
187-
for pid in current_pids:
188-
self.assertTrue(pid in previous_pids)
184+
def check_reuse_worker_of_parallelize_range():
185+
rdd = self.sc.parallelize(range(20), 8)
186+
previous_pids = rdd.map(lambda x: os.getpid()).collect()
187+
current_pids = rdd.map(lambda x: os.getpid()).collect()
188+
for pid in current_pids:
189+
self.assertTrue(pid in previous_pids)
190+
return True
191+
192+
eventually(check_reuse_worker_of_parallelize_range, catch_assertions=True)
189193

190194

191195
@unittest.skipIf(

0 commit comments

Comments
 (0)