@@ -223,6 +223,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
223223 private var _listenerBusStarted : Boolean = false
224224 private var _jars : Seq [String ] = _
225225 private var _files : Seq [String ] = _
226+ private var _shutdownHookRef : AnyRef = _
226227
227228 /* ------------------------------------------------------------------------------------- *
228229 | Accessors and public fields. These provide access to the internal state of the |
@@ -517,6 +518,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
517518 _taskScheduler.postStartHook()
518519 _env.metricsSystem.registerSource(new DAGSchedulerSource (dagScheduler))
519520 _env.metricsSystem.registerSource(new BlockManagerSource (_env.blockManager))
521+
522+ // Make sure the context is stopped if the user forgets about it. This avoids leaving
523+ // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
524+ // is killed, though.
525+ _shutdownHookRef = Utils .addShutdownHook(Utils .SPARK_CONTEXT_SHUTDOWN_PRIORITY ) { () =>
526+ logInfo(" Invoking stop() from shutdown hook" )
527+ stop()
528+ }
520529 } catch {
521530 case NonFatal (e) =>
522531 logError(" Error initializing SparkContext." , e)
@@ -1065,7 +1074,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10651074 /** Build the union of a list of RDDs. */
10661075 def union [T : ClassTag ](rdds : Seq [RDD [T ]]): RDD [T ] = withRDDScope {
10671076 val partitioners = rdds.flatMap(_.partitioner).toSet
1068- if (partitioners.size == 1 ) {
1077+ if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1 ) {
10691078 new PartitionerAwareUnionRDD (this , rdds)
10701079 } else {
10711080 new UnionRDD (this , rdds)
@@ -1492,6 +1501,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14921501 logInfo(" SparkContext already stopped." )
14931502 return
14941503 }
1504+ if (_shutdownHookRef != null ) {
1505+ Utils .removeShutdownHook(_shutdownHookRef)
1506+ }
14951507
14961508 postApplicationEnd()
14971509 _ui.foreach(_.stop())
@@ -1902,7 +1914,7 @@ object SparkContext extends Logging {
19021914 *
19031915 * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
19041916 */
1905- private val activeContext : AtomicReference [SparkContext ] =
1917+ private val activeContext : AtomicReference [SparkContext ] =
19061918 new AtomicReference [SparkContext ](null )
19071919
19081920 /**
@@ -1955,11 +1967,11 @@ object SparkContext extends Logging {
19551967 }
19561968
19571969 /**
1958- * This function may be used to get or instantiate a SparkContext and register it as a
1959- * singleton object. Because we can only have one active SparkContext per JVM,
1960- * this is useful when applications may wish to share a SparkContext.
1970+ * This function may be used to get or instantiate a SparkContext and register it as a
1971+ * singleton object. Because we can only have one active SparkContext per JVM,
1972+ * this is useful when applications may wish to share a SparkContext.
19611973 *
1962- * Note: This function cannot be used to create multiple SparkContext instances
1974+ * Note: This function cannot be used to create multiple SparkContext instances
19631975 * even if multiple contexts are allowed.
19641976 */
19651977 def getOrCreate (config : SparkConf ): SparkContext = {
@@ -1972,17 +1984,17 @@ object SparkContext extends Logging {
19721984 activeContext.get()
19731985 }
19741986 }
1975-
1987+
19761988 /**
1977- * This function may be used to get or instantiate a SparkContext and register it as a
1978- * singleton object. Because we can only have one active SparkContext per JVM,
1989+ * This function may be used to get or instantiate a SparkContext and register it as a
1990+ * singleton object. Because we can only have one active SparkContext per JVM,
19791991 * this is useful when applications may wish to share a SparkContext.
1980- *
1992+ *
19811993 * This method allows not passing a SparkConf (useful if just retrieving).
1982- *
1983- * Note: This function cannot be used to create multiple SparkContext instances
1984- * even if multiple contexts are allowed.
1985- */
1994+ *
1995+ * Note: This function cannot be used to create multiple SparkContext instances
1996+ * even if multiple contexts are allowed.
1997+ */
19861998 def getOrCreate (): SparkContext = {
19871999 getOrCreate(new SparkConf ())
19882000 }
0 commit comments