From 224916fd0fbaad80a15edc0834806715c0955024 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 28 Jan 2016 16:59:14 -0800 Subject: [PATCH] Add rpcEnv.awaitTermination() back to SparkEnv `rpcEnv.awaitTermination()` was not added in #10854 because some Streaming Python tests hung forever. This patch fixed the hung issue and added rpcEnv.awaitTermination() back to SparkEnv. Previously, Streaming Kafka Python tests shutdowns the zookeeper server before stopping StreamingContext. Then when stopping StreamingContext, KafkaReceiver may be hung due to https://issues.apache.org/jira/browse/KAFKA-601, hence, some thread of RpcEnv's Dispatcher cannot exit and rpcEnv.awaitTermination is hung.The patch just changed the shutdown order to fix it. --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 1 + python/pyspark/streaming/tests.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 12c7b2048a8c..9461afdc5412 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -91,6 +91,7 @@ class SparkEnv ( metricsSystem.stop() outputCommitCoordinator.stop() rpcEnv.shutdown() + rpcEnv.awaitTermination() // Note that blockTransferService is stopped by BlockManager since it is started by it. diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 24b812615cbb..b33e8252a7d3 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1013,12 +1013,12 @@ def setUp(self): self._kafkaTestUtils.setup() def tearDown(self): + super(KafkaStreamTests, self).tearDown() + if self._kafkaTestUtils is not None: self._kafkaTestUtils.teardown() self._kafkaTestUtils = None - super(KafkaStreamTests, self).tearDown() - def _randomTopic(self): return "topic-%d" % random.randint(0, 10000)