From b566b66dbf6dd380512f11fcee2ef77d0461522b Mon Sep 17 00:00:00 2001 From: Joshi Date: Tue, 23 Jun 2015 21:42:29 -0700 Subject: [PATCH 01/12] SPARK-2645: Fix for SparkContext stop behavior --- .../scala/org/apache/spark/SparkEnv.scala | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index b0665570e268..5b05a17bcbe8 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -90,17 +90,25 @@ class SparkEnv ( private var driverTmpDirToDelete: Option[String] = None private[spark] def stop() { + + if(isStopped) return + isStopped = true - pythonWorkers.foreach { case(key, worker) => worker.stop() } - Option(httpFileServer).foreach(_.stop()) - mapOutputTracker.stop() - shuffleManager.stop() - broadcastManager.stop() - blockManager.stop() - blockManager.master.stop() - metricsSystem.stop() - outputCommitCoordinator.stop() - rpcEnv.shutdown() + try { + pythonWorkers.foreach { case (key, worker) => worker.stop()} + Option(httpFileServer).foreach(_.stop()) + mapOutputTracker.stop() + shuffleManager.stop() + broadcastManager.stop() + blockManager.stop() + blockManager.master.stop() + metricsSystem.stop() + outputCommitCoordinator.stop() + rpcEnv.shutdown() + } catch { + case e: Exception => + logInfo("Exception while SparkEnv stop", e) + } // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut // down, but let's call it anyway in case it gets fixed in a later release From 380c5b00c8b76546afab68cc39cd1f648b298856 Mon Sep 17 00:00:00 2001 From: Joshi Date: Wed, 24 Jun 2015 19:35:55 -0700 Subject: [PATCH 02/12] SPARK-2645: Fix for SparkContext stop behavior --- .../scala/org/apache/spark/SparkEnv.scala | 76 ++++++++++--------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 5b05a17bcbe8..edfe570897d1 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -45,6 +45,8 @@ import org.apache.spark.storage._ import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator} import org.apache.spark.util.{RpcUtils, Utils} +import scala.util.control.NonFatal + /** * :: DeveloperApi :: * Holds all the runtime environment objects for a running Spark instance (either master or worker), @@ -91,46 +93,46 @@ class SparkEnv ( private[spark] def stop() { - if(isStopped) return - - isStopped = true - try { - pythonWorkers.foreach { case (key, worker) => worker.stop()} - Option(httpFileServer).foreach(_.stop()) - mapOutputTracker.stop() - shuffleManager.stop() - broadcastManager.stop() - blockManager.stop() - blockManager.master.stop() - metricsSystem.stop() - outputCommitCoordinator.stop() - rpcEnv.shutdown() - } catch { - case e: Exception => - logInfo("Exception while SparkEnv stop", e) - } + if(!isStopped) { + isStopped = true + try { + pythonWorkers.foreach { case (key, worker) => worker.stop()} + Option(httpFileServer).foreach(_.stop()) + mapOutputTracker.stop() + shuffleManager.stop() + broadcastManager.stop() + blockManager.stop() + blockManager.master.stop() + metricsSystem.stop() + outputCommitCoordinator.stop() + rpcEnv.shutdown() + } catch { + case NonFatal(e) => + logInfo("Exception while SparkEnv stop", e) + } - // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut - // down, but let's call it anyway in case it gets fixed in a later release - // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it. - // actorSystem.awaitTermination() - - // Note that blockTransferService is stopped by BlockManager since it is started by it. - - // If we only stop sc, but the driver process still run as a services then we need to delete - // the tmp dir, if not, it will create too many tmp dirs. - // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the - // current working dir in executor which we do not need to delete. - driverTmpDirToDelete match { - case Some(path) => { - try { - Utils.deleteRecursively(new File(path)) - } catch { - case e: Exception => - logWarning(s"Exception while deleting Spark temp dir: $path", e) + // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut + // down, but let's call it anyway in case it gets fixed in a later release + // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it. + // actorSystem.awaitTermination() + + // Note that blockTransferService is stopped by BlockManager since it is started by it. + + // If we only stop sc, but the driver process still run as a services then we need to delete + // the tmp dir, if not, it will create too many tmp dirs. + // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the + // current working dir in executor which we do not need to delete. + driverTmpDirToDelete match { + case Some(path) => { + try { + Utils.deleteRecursively(new File(path)) + } catch { + case e: Exception => + logWarning(s"Exception while deleting Spark temp dir: $path", e) + } } + case None => // We just need to delete tmp dir created by driver, so do nothing on executor } - case None => // We just need to delete tmp dir created by driver, so do nothing on executor } } From 58dba70ac26fb77c1668a259a347603947787afe Mon Sep 17 00:00:00 2001 From: Joshi Date: Wed, 24 Jun 2015 20:30:57 -0700 Subject: [PATCH 03/12] SPARK-2645: Fix for SparkContext stop behavior --- .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../org/apache/spark/SparkContextSuite.scala | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index edfe570897d1..a4b511b4f0ea 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -93,7 +93,7 @@ class SparkEnv ( private[spark] def stop() { - if(!isStopped) { + if (!isStopped) { isStopped = true try { pythonWorkers.foreach { case (key, worker) => worker.stop()} diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 6838b35ab4cc..765356c715a6 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils import scala.concurrent.Await import scala.concurrent.duration.Duration +import scala.util.control.NonFatal class SparkContextSuite extends SparkFunSuite with LocalSparkContext { @@ -272,4 +273,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } } + + test("calling multiple sc.stop() must not throw uncaught exception(50) from sparkenv") { + var threwNoOrOnlyExceptedException = true + try { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + val cnt = sc.parallelize(1 to 4).count() + sc.cancelAllJobs() + sc.stop() + // call stop second time + sc.stop() + } catch { + case e: ServerStateException => + // assert(!e.getMessage.contains("Server is already stopped")) + threwNoOrOnlyExceptedException = false + case NonFatal(e) => + threwNoOrOnlyExceptedException = true + } finally { + assert(threwNoOrOnlyExceptedException == true) + } + } } From 9193a0cf315f07f149846cac973d245f832797e4 Mon Sep 17 00:00:00 2001 From: Joshi Date: Thu, 25 Jun 2015 11:59:39 -0700 Subject: [PATCH 04/12] Fix for SparkContext stop behavior --- .../scala/org/apache/spark/SparkContextSuite.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 765356c715a6..24a9334cb513 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -274,8 +274,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } } - test("calling multiple sc.stop() must not throw uncaught exception(50) from sparkenv") { - var threwNoOrOnlyExceptedException = true + test("calling multiple sc.stop() must not throw any exception") { try { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) val cnt = sc.parallelize(1 to 4).count() @@ -284,13 +283,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { // call stop second time sc.stop() } catch { - case e: ServerStateException => - // assert(!e.getMessage.contains("Server is already stopped")) - threwNoOrOnlyExceptedException = false - case NonFatal(e) => - threwNoOrOnlyExceptedException = true - } finally { - assert(threwNoOrOnlyExceptedException == true) + case e: Exception => + fail("calling multiple sc.stop() must not have thrown any exception"); } } } From a5a7d7f10693f06c51b57535e299236ebcfa37c0 Mon Sep 17 00:00:00 2001 From: Joshi Date: Thu, 25 Jun 2015 22:01:12 -0700 Subject: [PATCH 05/12] Fix for SparkContext stop behavior --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index a4b511b4f0ea..b2fcc89045ff 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -107,8 +107,10 @@ class SparkEnv ( outputCommitCoordinator.stop() rpcEnv.shutdown() } catch { - case NonFatal(e) => + case e: ServerStateException => logInfo("Exception while SparkEnv stop", e) + case NonFatal(e) => + throw e; } // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut From 72bb4840b413d80b55a323c0ce5b1d2059b49685 Mon Sep 17 00:00:00 2001 From: Joshi Date: Thu, 25 Jun 2015 22:04:56 -0700 Subject: [PATCH 06/12] Fix for SparkContext stop behavior --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index b2fcc89045ff..c1ffe7840d9e 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -107,9 +107,8 @@ class SparkEnv ( outputCommitCoordinator.stop() rpcEnv.shutdown() } catch { - case e: ServerStateException => - logInfo("Exception while SparkEnv stop", e) case NonFatal(e) => + logInfo("Exception while SparkEnv stop", e) throw e; } From 12f66b52816152bc5a9db39964708e1c173c5286 Mon Sep 17 00:00:00 2001 From: Joshi Date: Fri, 26 Jun 2015 11:11:17 -0700 Subject: [PATCH 07/12] Fix for SparkContext stop behavior --- core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 24a9334cb513..d79ec5cddf19 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -30,7 +30,6 @@ import org.apache.spark.util.Utils import scala.concurrent.Await import scala.concurrent.duration.Duration -import scala.util.control.NonFatal class SparkContextSuite extends SparkFunSuite with LocalSparkContext { @@ -284,7 +283,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } catch { case e: Exception => - fail("calling multiple sc.stop() must not have thrown any exception"); + fail("calling multiple sc.stop() must not throw any exception", e); } } } From 1aff39c675a87f2d3f3be22a3955ddc2e4997d7b Mon Sep 17 00:00:00 2001 From: Joshi Date: Fri, 26 Jun 2015 11:15:14 -0700 Subject: [PATCH 08/12] Fix for SparkContext stop behavior --- .../src/test/scala/org/apache/spark/SparkContextSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index d79ec5cddf19..35cd2b1e1fff 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils import scala.concurrent.Await import scala.concurrent.duration.Duration +import org.scalatest.Matchers._ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { @@ -274,16 +275,13 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } test("calling multiple sc.stop() must not throw any exception") { - try { + noException should be thrownBy { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) val cnt = sc.parallelize(1 to 4).count() sc.cancelAllJobs() sc.stop() // call stop second time sc.stop() - } catch { - case e: Exception => - fail("calling multiple sc.stop() must not throw any exception", e); } } } From c97839ace8b1f5407be0a4f4ed3202395b4c6e2f Mon Sep 17 00:00:00 2001 From: Joshi Date: Fri, 26 Jun 2015 11:38:35 -0700 Subject: [PATCH 09/12] Fix for SparkContext stop behavior --- core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 35cd2b1e1fff..04a00425f5eb 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -275,13 +275,16 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } test("calling multiple sc.stop() must not throw any exception") { - noException should be thrownBy { + try { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) val cnt = sc.parallelize(1 to 4).count() sc.cancelAllJobs() sc.stop() // call stop second time sc.stop() + } catch { + case e: Exception => + fail("calling multiple sc.stop() must not have thrown any exception", e); } } } From 2ce57601e95623206db5acd38da09dc84a44044f Mon Sep 17 00:00:00 2001 From: Joshi Date: Mon, 29 Jun 2015 12:42:35 -0700 Subject: [PATCH 10/12] Fix for SparkContext stop behavior --- .../src/test/scala/org/apache/spark/SparkContextSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 04a00425f5eb..5c57940fa5f7 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -275,16 +275,14 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } test("calling multiple sc.stop() must not throw any exception") { - try { + noException should be thrownBy { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) val cnt = sc.parallelize(1 to 4).count() sc.cancelAllJobs() sc.stop() // call stop second time sc.stop() - } catch { - case e: Exception => - fail("calling multiple sc.stop() must not have thrown any exception", e); } } + } From 446b0a4206f07525b726413444b6354dca55fea3 Mon Sep 17 00:00:00 2001 From: Joshi Date: Mon, 29 Jun 2015 20:22:51 -0700 Subject: [PATCH 11/12] Fix for SparkContext stop behavior --- .../scala/org/apache/spark/SparkEnv.scala | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index c1ffe7840d9e..e49779b3b094 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -95,22 +95,18 @@ class SparkEnv ( if (!isStopped) { isStopped = true - try { - pythonWorkers.foreach { case (key, worker) => worker.stop()} - Option(httpFileServer).foreach(_.stop()) - mapOutputTracker.stop() - shuffleManager.stop() - broadcastManager.stop() - blockManager.stop() - blockManager.master.stop() - metricsSystem.stop() - outputCommitCoordinator.stop() - rpcEnv.shutdown() - } catch { - case NonFatal(e) => - logInfo("Exception while SparkEnv stop", e) - throw e; - } + + pythonWorkers.foreach { case (key, worker) => worker.stop()} + Option(httpFileServer).foreach(_.stop()) + mapOutputTracker.stop() + shuffleManager.stop() + broadcastManager.stop() + blockManager.stop() + blockManager.master.stop() + metricsSystem.stop() + outputCommitCoordinator.stop() + rpcEnv.shutdown() + // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut // down, but let's call it anyway in case it gets fixed in a later release From 277043eaac8f168da5d04715334fc7916afa577d Mon Sep 17 00:00:00 2001 From: Joshi Date: Tue, 30 Jun 2015 10:49:26 -0700 Subject: [PATCH 12/12] Fix for SparkContext stop behavior --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index e49779b3b094..1b133fbdfaf5 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -22,7 +22,6 @@ import java.net.Socket import akka.actor.ActorSystem -import scala.collection.JavaConversions._ import scala.collection.mutable import scala.util.Properties @@ -45,8 +44,6 @@ import org.apache.spark.storage._ import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator} import org.apache.spark.util.{RpcUtils, Utils} -import scala.util.control.NonFatal - /** * :: DeveloperApi :: * Holds all the runtime environment objects for a running Spark instance (either master or worker), @@ -95,8 +92,7 @@ class SparkEnv ( if (!isStopped) { isStopped = true - - pythonWorkers.foreach { case (key, worker) => worker.stop()} + pythonWorkers.values.foreach(_.stop()) Option(httpFileServer).foreach(_.stop()) mapOutputTracker.stop() shuffleManager.stop() @@ -107,7 +103,6 @@ class SparkEnv ( outputCommitCoordinator.stop() rpcEnv.shutdown() - // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut // down, but let's call it anyway in case it gets fixed in a later release // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.