Skip to content

Commit 335f10e

Browse files
zsxwingrxin
authored andcommitted
[SPARK-7997][CORE] Add rpcEnv.awaitTermination() back to SparkEnv
`rpcEnv.awaitTermination()` was not added in #10854 because some Streaming Python tests hung forever. This patch fixed the hung issue and added rpcEnv.awaitTermination() back to SparkEnv. Previously, Streaming Kafka Python tests shutdowns the zookeeper server before stopping StreamingContext. Then when stopping StreamingContext, KafkaReceiver may be hung due to https://issues.apache.org/jira/browse/KAFKA-601, hence, some thread of RpcEnv's Dispatcher cannot exit and rpcEnv.awaitTermination is hung.The patch just changed the shutdown order to fix it. Author: Shixiong Zhu <[email protected]> Closes #11031 from zsxwing/awaitTermination.
1 parent 0557146 commit 335f10e

File tree

2 files changed

+3
-2
lines changed

2 files changed

+3
-2
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class SparkEnv (
9191
metricsSystem.stop()
9292
outputCommitCoordinator.stop()
9393
rpcEnv.shutdown()
94+
rpcEnv.awaitTermination()
9495

9596
// Note that blockTransferService is stopped by BlockManager since it is started by it.
9697

python/pyspark/streaming/tests.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,12 +1013,12 @@ def setUp(self):
10131013
self._kafkaTestUtils.setup()
10141014

10151015
def tearDown(self):
1016+
super(KafkaStreamTests, self).tearDown()
1017+
10161018
if self._kafkaTestUtils is not None:
10171019
self._kafkaTestUtils.teardown()
10181020
self._kafkaTestUtils = None
10191021

1020-
super(KafkaStreamTests, self).tearDown()
1021-
10221022
def _randomTopic(self):
10231023
return "topic-%d" % random.randint(0, 10000)
10241024

0 commit comments

Comments
 (0)