From afaa7e37cce76da8173ff556d217a67a1633426a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Nov 2014 15:06:13 -0800 Subject: [PATCH 01/14] [SPARK-4180] Prevent creations of multiple active SparkContexts. --- .../scala/org/apache/spark/SparkContext.scala | 100 ++++++++++++++---- .../org/apache/spark/SparkContextSuite.scala | 48 ++++++++- 2 files changed, 124 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8b4db783979ec..fb3364e87dd4a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -179,6 +179,30 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { conf.setIfMissing("spark.driver.host", Utils.localHostName()) conf.setIfMissing("spark.driver.port", "0") + // This is placed after the configuration validation so that common configuration errors, like + // forgetting to pass a master url or app name, don't prevent subsequent SparkContexts from being + // constructed. + SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + SparkContext.activeSparkContextCreationSite.foreach { creationSite => + val errMsg = "Only one SparkContext may be active in this JVM (see SPARK-2243)." + val errDetails = if (SparkContext.activeSparkContextIsFullyConstructed) { + s"The currently active SparkContext was created at ${creationSite.shortForm}" + } else { + s"Another SparkContext, created at ${creationSite.shortForm}, is either being constructed" + + " or threw an exception from its constructor; please restart your JVM in order to" + + " create a new SparkContext." + } + val exception = new SparkException(s"$errMsg $errDetails") + if (conf.getBoolean("spark.driver.disableMultipleSparkContextsErrorChecking", false)) { + logWarning("Multiple SparkContext error detection is disabled!", exception) + } else { + throw exception + } + } + SparkContext.activeSparkContextCreationSite = Some(Utils.getCallSite()) + SparkContext.activeSparkContextIsFullyConstructed = false + } + val jars: Seq[String] = conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten @@ -1071,27 +1095,31 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { /** Shut down the SparkContext. */ def stop() { - postApplicationEnd() - ui.foreach(_.stop()) - // Do this only if not stopped already - best case effort. - // prevent NPE if stopped more than once. - val dagSchedulerCopy = dagScheduler - dagScheduler = null - if (dagSchedulerCopy != null) { - env.metricsSystem.report() - metadataCleaner.cancel() - env.actorSystem.stop(heartbeatReceiver) - cleaner.foreach(_.stop()) - dagSchedulerCopy.stop() - taskScheduler = null - // TODO: Cache.stop()? - env.stop() - SparkEnv.set(null) - listenerBus.stop() - eventLogger.foreach(_.stop()) - logInfo("Successfully stopped SparkContext") - } else { - logInfo("SparkContext already stopped") + SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + SparkContext.activeSparkContextCreationSite = None + SparkContext.activeSparkContextIsFullyConstructed = false + postApplicationEnd() + ui.foreach(_.stop()) + // Do this only if not stopped already - best case effort. + // prevent NPE if stopped more than once. + val dagSchedulerCopy = dagScheduler + dagScheduler = null + if (dagSchedulerCopy != null) { + env.metricsSystem.report() + metadataCleaner.cancel() + env.actorSystem.stop(heartbeatReceiver) + cleaner.foreach(_.stop()) + dagSchedulerCopy.stop() + taskScheduler = null + // TODO: Cache.stop()? + env.stop() + SparkEnv.set(null) + listenerBus.stop() + eventLogger.foreach(_.stop()) + logInfo("Successfully stopped SparkContext") + } else { + logInfo("SparkContext already stopped") + } } } @@ -1157,7 +1185,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { if (dagScheduler == null) { throw new SparkException("SparkContext has been shutdown") } - val callSite = getCallSite + val callSite = Utils.getCallSite() val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, @@ -1380,6 +1408,10 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { private[spark] def cleanup(cleanupTime: Long) { persistentRdds.clearOldValues(cleanupTime) } + + SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + SparkContext.activeSparkContextIsFullyConstructed = true + } } /** @@ -1388,6 +1420,30 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { */ object SparkContext extends Logging { + /** + * Lock that prevents multiple threads from being in the SparkContext constructor at the same + * time. + */ + private[spark] val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() + + /** + * Records the creation site of the last SparkContext to successfully enter the constructor. + * This may be an active SparkContext, or a SparkContext that is currently under construction. + * + * Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + */ + private[spark] var activeSparkContextCreationSite: Option[CallSite] = None + + /** + * Tracks whether `activeSparkContextCreationSite` refers to a fully-constructed SparkContext + * or a partially-constructed one that is either still executing its constructor or threw + * an exception from its constructor. This is used to enable better error-reporting when + * SparkContext construction fails due to existing contexts. + * + * Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + */ + private[spark] var activeSparkContextIsFullyConstructed: Boolean = false + private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 31edad1c56c73..78afd06a8b6a2 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -21,9 +21,53 @@ import org.scalatest.FunSuite import org.apache.hadoop.io.BytesWritable -class SparkContextSuite extends FunSuite { - //Regression test for SPARK-3121 +class SparkContextSuite extends FunSuite with LocalSparkContext { + + test("Only one SparkContext may be active at a time") { + // Regression test for SPARK-4180 + val conf = new SparkConf().setAppName("test").setMaster("local") + sc = new SparkContext(conf) + // A SparkContext is already running, so we shouldn't be able to create a second one + intercept[SparkException] { new SparkContext(conf) } + // After stopping the running context, we should be able to create a new one + resetSparkContext() + sc = new SparkContext(conf) + } + + test("Can still construct a new SparkContext after failing due to missing app name or master") { + val missingMaster = new SparkConf() + val missingAppName = missingMaster.clone.setMaster("local") + val validConf = missingAppName.clone.setAppName("test") + // We shouldn't be able to construct SparkContexts because these are invalid configurations + intercept[SparkException] { new SparkContext(missingMaster) } + intercept[SparkException] { new SparkContext(missingAppName) } + // Even though those earlier calls failed, we should still be able to create a new SparkContext + sc = new SparkContext(validConf) + } + + test("Check for multiple SparkContexts can be disabled via undocumented debug option") { + val propertyName = "spark.driver.disableMultipleSparkContextsErrorChecking" + val originalPropertyValue = System.getProperty(propertyName) + var secondSparkContext: SparkContext = null + try { + System.setProperty(propertyName, "true") + val conf = new SparkConf().setAppName("test").setMaster("local") + sc = new SparkContext(conf) + secondSparkContext = new SparkContext(conf) + } finally { + if (secondSparkContext != null) { + secondSparkContext.stop() + } + if (originalPropertyValue != null) { + System.setProperty(propertyName, originalPropertyValue) + } else { + System.clearProperty(propertyName) + } + } + } + test("BytesWritable implicit conversion is correct") { + // Regression test for SPARK-3121 val bytesWritable = new BytesWritable() val inputArray = (1 to 10).map(_.toByte).toArray bytesWritable.set(inputArray, 0, 10) From 918e8782eb94ea59330f285cbbaaa7aaa7235583 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Nov 2014 15:50:44 -0800 Subject: [PATCH 02/14] Document "one SparkContext per JVM" limitation. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 +++ .../scala/org/apache/spark/api/java/JavaSparkContext.scala | 3 +++ docs/programming-guide.md | 2 ++ 3 files changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fb3364e87dd4a..0fa218ecb0e69 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -57,6 +57,9 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * + * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before + * creating a new one. This limitation will eventually be removed; see SPARK-2243 for more details. + * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index e3aeba7e6c39d..30f8fbe3f0a47 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -46,6 +46,9 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. + * + * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before + * creating a new one. This limitation will eventually be removed; see SPARK-2243 for more details. */ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround with Closeable { diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 18420afb27e3c..b4e252587d2e7 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -117,6 +117,8 @@ The first thing a Spark program must do is to create a [SparkContext](api/scala/ how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object that contains information about your application. +Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before creating a new one. + {% highlight scala %} val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf) From c4d35a255c5b912f83df8b55557edb1f557fa251 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Nov 2014 18:22:30 -0800 Subject: [PATCH 03/14] Log long form of creation site to aid debugging. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0fa218ecb0e69..ecfd88a4909cb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -189,11 +189,11 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { SparkContext.activeSparkContextCreationSite.foreach { creationSite => val errMsg = "Only one SparkContext may be active in this JVM (see SPARK-2243)." val errDetails = if (SparkContext.activeSparkContextIsFullyConstructed) { - s"The currently active SparkContext was created at ${creationSite.shortForm}" + s"The currently active SparkContext was created at:\n${creationSite.longForm}" } else { - s"Another SparkContext, created at ${creationSite.shortForm}, is either being constructed" + - " or threw an exception from its constructor; please restart your JVM in order to" + - " create a new SparkContext." + s"Another SparkContext is either being constructed or threw an exception from its" + + " constructor; please restart your JVM in order to create a new SparkContext." + + s"The current SparkContext was created at:\n${creationSite.longForm}" } val exception = new SparkException(s"$errMsg $errDetails") if (conf.getBoolean("spark.driver.disableMultipleSparkContextsErrorChecking", false)) { From d0437ebbabf623f3e102ffaae166baae99786d90 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Nov 2014 18:23:22 -0800 Subject: [PATCH 04/14] StreamingContext.stop() should stop SparkContext even if StreamingContext has not been started yet. --- .../spark/streaming/StreamingContext.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 23d6d1c5e50fa..d838f03fbae37 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -487,20 +487,20 @@ class StreamingContext private[streaming] ( * received data to be completed */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { - // Warn (but not fail) if context is stopped twice, - // or context is stopped before starting - if (state == Initialized) { - logWarning("StreamingContext has not been started yet") - return - } if (state == Stopped) { logWarning("StreamingContext has already been stopped") - return - } // no need to throw an exception as its okay to stop twice - scheduler.stop(stopGracefully) - logInfo("StreamingContext stopped successfully") - waiter.notifyStop() - if (stopSparkContext) sc.stop() + } else { + // Even if the streaming context has not been started, we still need to stop the SparkContext: + if (stopSparkContext) sc.stop() + if (state == Initialized) { + logWarning("StreamingContext has not been started yet") + } else { + scheduler.stop(stopGracefully) + logInfo("StreamingContext stopped successfully") + waiter.notifyStop() + } + } + // The state should always be Stopped after calling `stop()`, even if we haven't started yet: state = Stopped } } From 06c5c541cf8fcd72715fd304ea8f12b7cfbc8463 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Nov 2014 18:24:15 -0800 Subject: [PATCH 05/14] Add / improve SparkContext cleanup in streaming BasicOperationsSuite --- .../streaming/BasicOperationsSuite.scala | 188 +++++++++--------- .../spark/streaming/TestSuiteBase.scala | 52 ++++- 2 files changed, 138 insertions(+), 102 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 6c8bb50145367..ba6e54d752fb0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -395,33 +395,34 @@ class BasicOperationsSuite extends TestSuiteBase { } test("slice") { - val ssc = new StreamingContext(conf, Seconds(1)) - val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) - val stream = new TestInputStream[Int](ssc, input, 2) - stream.foreachRDD(_ => {}) // Dummy output stream - ssc.start() - Thread.sleep(2000) - def getInputFromSlice(fromMillis: Long, toMillis: Long) = { - stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet - } + withStreamingContext(new StreamingContext(conf, Seconds(1))) { ssc => + val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) + val stream = new TestInputStream[Int](ssc, input, 2) + stream.foreachRDD(_ => {}) // Dummy output stream + ssc.start() + Thread.sleep(2000) + def getInputFromSlice(fromMillis: Long, toMillis: Long) = { + stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet + } - assert(getInputFromSlice(0, 1000) == Set(1)) - assert(getInputFromSlice(0, 2000) == Set(1, 2)) - assert(getInputFromSlice(1000, 2000) == Set(1, 2)) - assert(getInputFromSlice(2000, 4000) == Set(2, 3, 4)) - ssc.stop() - Thread.sleep(1000) + assert(getInputFromSlice(0, 1000) == Set(1)) + assert(getInputFromSlice(0, 2000) == Set(1, 2)) + assert(getInputFromSlice(1000, 2000) == Set(1, 2)) + assert(getInputFromSlice(2000, 4000) == Set(2, 3, 4)) + } } - +/* test("slice - has not been initialized") { - val ssc = new StreamingContext(conf, Seconds(1)) - val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) - val stream = new TestInputStream[Int](ssc, input, 2) - val thrown = intercept[SparkException] { - stream.slice(new Time(0), new Time(1000)) + withStreamingContext(new StreamingContext(conf, Seconds(1))) { ssc => + val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) + val stream = new TestInputStream[Int](ssc, input, 2) + val thrown = intercept[SparkException] { + stream.slice(new Time(0), new Time(1000)) + } + assert(thrown.getMessage.contains("has not been initialized")) } - assert(thrown.getMessage.contains("has not been initialized")) } + */ val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq @@ -480,73 +481,72 @@ class BasicOperationsSuite extends TestSuiteBase { test("rdd cleanup - input blocks and persisted RDDs") { // Actually receive data over through receiver to create BlockRDDs - // Start the server - val testServer = new TestServer() - testServer.start() - - // Set up the streaming context and input streams - val ssc = new StreamingContext(conf, batchDuration) - val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) - val mappedStream = networkStream.map(_ + ".").persist() - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(mappedStream, outputBuffer) - - outputStream.register() - ssc.start() - - // Feed data to the server to send to the network receiver - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val input = Seq(1, 2, 3, 4, 5, 6) + withTestServer(new TestServer()) { testServer => + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + testServer.start() + // Set up the streaming context and input streams + val networkStream = + ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) + val mappedStream = networkStream.map(_ + ".").persist() + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(mappedStream, outputBuffer) + + outputStream.register() + ssc.start() + + // Feed data to the server to send to the network receiver + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5, 6) + + val blockRdds = new mutable.HashMap[Time, BlockRDD[_]] + val persistentRddIds = new mutable.HashMap[Time, Int] + + def collectRddInfo() { // get all RDD info required for verification + networkStream.generatedRDDs.foreach { case (time, rdd) => + blockRdds(time) = rdd.asInstanceOf[BlockRDD[_]] + } + mappedStream.generatedRDDs.foreach { case (time, rdd) => + persistentRddIds(time) = rdd.id + } + } - val blockRdds = new mutable.HashMap[Time, BlockRDD[_]] - val persistentRddIds = new mutable.HashMap[Time, Int] + Thread.sleep(200) + for (i <- 0 until input.size) { + testServer.send(input(i).toString + "\n") + Thread.sleep(200) + clock.addToTime(batchDuration.milliseconds) + collectRddInfo() + } - def collectRddInfo() { // get all RDD info required for verification - networkStream.generatedRDDs.foreach { case (time, rdd) => - blockRdds(time) = rdd.asInstanceOf[BlockRDD[_]] - } - mappedStream.generatedRDDs.foreach { case (time, rdd) => - persistentRddIds(time) = rdd.id + Thread.sleep(200) + collectRddInfo() + logInfo("Stopping server") + testServer.stop() + + // verify data has been received + assert(outputBuffer.size > 0) + assert(blockRdds.size > 0) + assert(persistentRddIds.size > 0) + + import Time._ + + val latestPersistedRddId = persistentRddIds(persistentRddIds.keySet.max) + val earliestPersistedRddId = persistentRddIds(persistentRddIds.keySet.min) + val latestBlockRdd = blockRdds(blockRdds.keySet.max) + val earliestBlockRdd = blockRdds(blockRdds.keySet.min) + // verify that the latest mapped RDD is persisted but the earliest one has been unpersisted + assert(ssc.sparkContext.persistentRdds.contains(latestPersistedRddId)) + assert(!ssc.sparkContext.persistentRdds.contains(earliestPersistedRddId)) + + // verify that the latest input blocks are present but the earliest blocks have been removed + assert(latestBlockRdd.isValid) + assert(latestBlockRdd.collect != null) + assert(!earliestBlockRdd.isValid) + earliestBlockRdd.blockIds.foreach { blockId => + assert(!ssc.sparkContext.env.blockManager.master.contains(blockId)) + } } } - - Thread.sleep(200) - for (i <- 0 until input.size) { - testServer.send(input(i).toString + "\n") - Thread.sleep(200) - clock.addToTime(batchDuration.milliseconds) - collectRddInfo() - } - - Thread.sleep(200) - collectRddInfo() - logInfo("Stopping server") - testServer.stop() - logInfo("Stopping context") - - // verify data has been received - assert(outputBuffer.size > 0) - assert(blockRdds.size > 0) - assert(persistentRddIds.size > 0) - - import Time._ - - val latestPersistedRddId = persistentRddIds(persistentRddIds.keySet.max) - val earliestPersistedRddId = persistentRddIds(persistentRddIds.keySet.min) - val latestBlockRdd = blockRdds(blockRdds.keySet.max) - val earliestBlockRdd = blockRdds(blockRdds.keySet.min) - // verify that the latest mapped RDD is persisted but the earliest one has been unpersisted - assert(ssc.sparkContext.persistentRdds.contains(latestPersistedRddId)) - assert(!ssc.sparkContext.persistentRdds.contains(earliestPersistedRddId)) - - // verify that the latest input blocks are present but the earliest blocks have been removed - assert(latestBlockRdd.isValid) - assert(latestBlockRdd.collect != null) - assert(!earliestBlockRdd.isValid) - earliestBlockRdd.blockIds.foreach { blockId => - assert(!ssc.sparkContext.env.blockManager.master.contains(blockId)) - } - ssc.stop() } /** Test cleanup of RDDs in DStream metadata */ @@ -560,13 +560,15 @@ class BasicOperationsSuite extends TestSuiteBase { // Setup the stream computation assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second, check cleanup tests") - val ssc = setupStreams(cleanupTestInput, operation) - val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]] - if (rememberDuration != null) ssc.remember(rememberDuration) - val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput) - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - assert(clock.time === Seconds(10).milliseconds) - assert(output.size === numExpectedOutput) - operatedStream + withStreamingContext(setupStreams(cleanupTestInput, operation)) { ssc => + val operatedStream = + ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]] + if (rememberDuration != null) ssc.remember(rememberDuration) + val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput) + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + assert(clock.time === Seconds(10).milliseconds) + assert(output.size === numExpectedOutput) + operatedStream + } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 2154c24abda3a..52972f63c6c5c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -163,6 +163,40 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { before(beforeFunction) after(afterFunction) + /** + * Run a block of code with the given StreamingContext and automatically + * stop the context when the block completes or when an exception is thrown. + */ + def withStreamingContext[R](ssc: StreamingContext)(block: StreamingContext => R): R = { + try { + block(ssc) + } finally { + try { + ssc.stop(stopSparkContext = true) + } catch { + case e: Exception => + logError("Error stopping StreamingContext", e) + } + } + } + + /** + * Run a block of code with the given TestServer and automatically + * stop the server when the block completes or when an exception is thrown. + */ + def withTestServer[R](testServer: TestServer)(block: TestServer => R): R = { + try { + block(testServer) + } finally { + try { + testServer.stop() + } catch { + case e: Exception => + logError("Error stopping TestServer", e) + } + } + } + /** * Set up required DStreams to test the DStream operation using the two sequences * of input collections. @@ -282,10 +316,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") Thread.sleep(100) // Give some time for the forgetting old RDDs to complete - } catch { - case e: Exception => {e.printStackTrace(); throw e} } finally { - ssc.stop() + ssc.stop(stopSparkContext = true) } output } @@ -351,9 +383,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { useSet: Boolean ) { val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size - val ssc = setupStreams[U, V](input, operation) - val output = runStreams[V](ssc, numBatches_, expectedOutput.size) - verifyOutput[V](output, expectedOutput, useSet) + withStreamingContext(setupStreams[U, V](input, operation)) { ssc => + val output = runStreams[V](ssc, numBatches_, expectedOutput.size) + verifyOutput[V](output, expectedOutput, useSet) + } } /** @@ -389,8 +422,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { useSet: Boolean ) { val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size - val ssc = setupStreams[U, V, W](input1, input2, operation) - val output = runStreams[W](ssc, numBatches_, expectedOutput.size) - verifyOutput[W](output, expectedOutput, useSet) + withStreamingContext(setupStreams[U, V, W](input1, input2, operation)) { ssc => + val output = runStreams[W](ssc, numBatches_, expectedOutput.size) + verifyOutput[W](output, expectedOutput, useSet) + } } } From ed17e14621f86137ccdc5408c9d10b3e683903f6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Nov 2014 17:00:18 -0800 Subject: [PATCH 06/14] Address review feedback; expose hack workaround for existing unit tests. --- .../scala/org/apache/spark/SparkContext.scala | 78 ++++++++++++------- .../ExecutorAllocationManagerSuite.scala | 4 + .../org/apache/spark/SparkContextSuite.scala | 2 +- 3 files changed, 57 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 51292600a7303..1c6636c11a98d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -185,26 +185,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // This is placed after the configuration validation so that common configuration errors, like // forgetting to pass a master url or app name, don't prevent subsequent SparkContexts from being // constructed. - SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - SparkContext.activeSparkContextCreationSite.foreach { creationSite => - val errMsg = "Only one SparkContext may be active in this JVM (see SPARK-2243)." - val errDetails = if (SparkContext.activeSparkContextIsFullyConstructed) { - s"The currently active SparkContext was created at:\n${creationSite.longForm}" - } else { - s"Another SparkContext is either being constructed or threw an exception from its" + - " constructor; please restart your JVM in order to create a new SparkContext." + - s"The current SparkContext was created at:\n${creationSite.longForm}" - } - val exception = new SparkException(s"$errMsg $errDetails") - if (conf.getBoolean("spark.driver.disableMultipleSparkContextsErrorChecking", false)) { - logWarning("Multiple SparkContext error detection is disabled!", exception) - } else { - throw exception - } - } - SparkContext.activeSparkContextCreationSite = Some(Utils.getCallSite()) - SparkContext.activeSparkContextIsFullyConstructed = false - } + SparkContext.verifyUniqueConstruction(conf) val jars: Seq[String] = conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten @@ -1124,8 +1105,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { /** Shut down the SparkContext. */ def stop() { SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - SparkContext.activeSparkContextCreationSite = None - SparkContext.activeSparkContextIsFullyConstructed = false postApplicationEnd() ui.foreach(_.stop()) // Do this only if not stopped already - best case effort. @@ -1145,6 +1124,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { listenerBus.stop() eventLogger.foreach(_.stop()) logInfo("Successfully stopped SparkContext") + SparkContext.clearActiveContext() } else { logInfo("SparkContext already stopped") } @@ -1437,9 +1417,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { persistentRdds.clearOldValues(cleanupTime) } - SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - SparkContext.activeSparkContextIsFullyConstructed = true - } + SparkContext.markFullyConstructed() } /** @@ -1470,7 +1448,55 @@ object SparkContext extends Logging { * * Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK */ - private[spark] var activeSparkContextIsFullyConstructed: Boolean = false + private[spark] var activeContextIsFullyConstructed: Boolean = false + + /** + * Called in the SparkContext constructor to ensure that no other SparkContext is running + * in the same JVM. + */ + private[spark] def verifyUniqueConstruction(conf: SparkConf) { + SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + activeSparkContextCreationSite.foreach { creationSite => + val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + val errDetails = if (activeContextIsFullyConstructed) { + s"The currently running SparkContext was created at:\n${creationSite.longForm}" + } else { + s"Another SparkContext is either being constructed or threw an exception from its" + + " constructor; please restart your JVM in order to create a new SparkContext." + + s"The current SparkContext was created at:\n${creationSite.longForm}" + } + val exception = new SparkException(s"$errMsg $errDetails") + if (conf.getBoolean("spark.driver.allowMultipleContexts", false)) { + logWarning("Multiple running SparkContexts detected in the same JVM!", exception) + } else { + throw exception + } + } + activeSparkContextCreationSite = Some(Utils.getCallSite()) + activeContextIsFullyConstructed = false + } + } + + /** + * Called once the SparkContext that's undergoing construction is fully constructed. + */ + private[spark] def markFullyConstructed() { + SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + activeContextIsFullyConstructed = true + } + } + + /** + * Clears the active SparkContext metadata. This is called by `SparkContext.stop()`. It's + * also called in unit tests to prevent a flood of warnings from test suites that don't / can't + * properly clean up their SparkContexts. + */ + private[spark] def clearActiveContext() { + SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + activeSparkContextCreationSite = None + activeContextIsFullyConstructed = false + } + } private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 66cf60d25f6d1..6a145c6fcc227 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -37,20 +37,24 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { .set("spark.dynamicAllocation.enabled", "true") intercept[SparkException] { new SparkContext(conf) } SparkEnv.get.stop() // cleanup the created environment + SparkContext.clearActiveContext() // Only min val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1") intercept[SparkException] { new SparkContext(conf1) } SparkEnv.get.stop() + SparkContext.clearActiveContext() // Only max val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2") intercept[SparkException] { new SparkContext(conf2) } SparkEnv.get.stop() + SparkContext.clearActiveContext() // Both min and max, but min > max intercept[SparkException] { createSparkContext(2, 1) } SparkEnv.get.stop() + SparkContext.clearActiveContext() // Both min and max, and min == max val sc1 = createSparkContext(1, 1) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 78afd06a8b6a2..095e29e39abb3 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -46,7 +46,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { } test("Check for multiple SparkContexts can be disabled via undocumented debug option") { - val propertyName = "spark.driver.disableMultipleSparkContextsErrorChecking" + val propertyName = "spark.driver.allowMultipleContexts" val originalPropertyValue = System.getProperty(propertyName) var secondSparkContext: SparkContext = null try { From 4629d5c3b9305353526992760a80e5ff0d5ae9aa Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Nov 2014 17:02:24 -0800 Subject: [PATCH 07/14] Set spark.driver.allowMultipleContexts=true in tests. --- project/SparkBuild.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 657e4b4432775..b633e0266ded0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -348,6 +348,7 @@ object TestSettings { javaOptions in Test += "-Dspark.testing=1", javaOptions in Test += "-Dspark.port.maxRetries=100", javaOptions in Test += "-Dspark.ui.enabled=false", + javaOptions in Test += "-Dspark.driver.allowMultipleContexts=true", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark") .map { case (k,v) => s"-D$k=$v" }.toSeq, From 7ba6db8e89a18cfa6e56c8b390998a2abb7b8560 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Nov 2014 17:09:28 -0800 Subject: [PATCH 08/14] Add utility to set system properties in tests. --- .../org/apache/spark/SparkContextSuite.scala | 77 +++++++++++-------- 1 file changed, 46 insertions(+), 31 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 095e29e39abb3..2a01effe863dc 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -23,45 +23,60 @@ import org.apache.hadoop.io.BytesWritable class SparkContextSuite extends FunSuite with LocalSparkContext { + /** Allows system properties to be changed in tests */ + private def withSystemProperty[T](property: String, value: String)(block: => T): T = { + val originalValue = System.getProperty(property) + try { + System.setProperty(property, value) + block + } finally { + if (originalValue == null) { + System.clearProperty(property) + } else { + System.setProperty(property, originalValue) + } + } + } + test("Only one SparkContext may be active at a time") { // Regression test for SPARK-4180 - val conf = new SparkConf().setAppName("test").setMaster("local") - sc = new SparkContext(conf) - // A SparkContext is already running, so we shouldn't be able to create a second one - intercept[SparkException] { new SparkContext(conf) } - // After stopping the running context, we should be able to create a new one - resetSparkContext() - sc = new SparkContext(conf) + withSystemProperty("spark.driver.allowMultipleContexts", "false") { + val conf = new SparkConf().setAppName("test").setMaster("local") + sc = new SparkContext(conf) + // A SparkContext is already running, so we shouldn't be able to create a second one + intercept[SparkException] { new SparkContext(conf) } + // After stopping the running context, we should be able to create a new one + resetSparkContext() + sc = new SparkContext(conf) + } } test("Can still construct a new SparkContext after failing due to missing app name or master") { - val missingMaster = new SparkConf() - val missingAppName = missingMaster.clone.setMaster("local") - val validConf = missingAppName.clone.setAppName("test") - // We shouldn't be able to construct SparkContexts because these are invalid configurations - intercept[SparkException] { new SparkContext(missingMaster) } - intercept[SparkException] { new SparkContext(missingAppName) } - // Even though those earlier calls failed, we should still be able to create a new SparkContext - sc = new SparkContext(validConf) + withSystemProperty("spark.driver.allowMultipleContexts", "false") { + val missingMaster = new SparkConf() + val missingAppName = missingMaster.clone.setMaster("local") + val validConf = missingAppName.clone.setAppName("test") + // We shouldn't be able to construct SparkContexts because these are invalid configurations + intercept[SparkException] { + new SparkContext(missingMaster) + } + intercept[SparkException] { + new SparkContext(missingAppName) + } + // Even though those earlier calls failed, we should still be able to create a new context + sc = new SparkContext(validConf) + } } test("Check for multiple SparkContexts can be disabled via undocumented debug option") { - val propertyName = "spark.driver.allowMultipleContexts" - val originalPropertyValue = System.getProperty(propertyName) - var secondSparkContext: SparkContext = null - try { - System.setProperty(propertyName, "true") - val conf = new SparkConf().setAppName("test").setMaster("local") - sc = new SparkContext(conf) - secondSparkContext = new SparkContext(conf) - } finally { - if (secondSparkContext != null) { - secondSparkContext.stop() - } - if (originalPropertyValue != null) { - System.setProperty(propertyName, originalPropertyValue) - } else { - System.clearProperty(propertyName) + withSystemProperty("spark.driver.allowMultipleContexts", "true") { + var secondSparkContext: SparkContext = null + try { + val conf = new SparkConf().setAppName("test").setMaster("local") + sc = new SparkContext(conf) + secondSparkContext = new SparkContext(conf) + } finally { + Option(secondSparkContext).foreach(_.stop()) } } } From 79a7e6ffa7d450246d04816fd493278a8bc60bc5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Nov 2014 00:10:34 -0800 Subject: [PATCH 09/14] Fix commented out test --- .../scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 5d3b06e4d368c..f4a269ce81ea0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -412,7 +412,6 @@ class BasicOperationsSuite extends TestSuiteBase { assert(getInputFromSlice(2000, 4000) == Set(2, 3, 4)) } } -/* test("slice - has not been initialized") { withStreamingContext(new StreamingContext(conf, Seconds(1))) { ssc => val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) @@ -423,7 +422,6 @@ class BasicOperationsSuite extends TestSuiteBase { assert(thrown.getMessage.contains("has not been initialized")) } } - */ val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq From d809cb4222cd0d23f942fd1319d1e01f5d8a57cb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Nov 2014 13:22:23 -0800 Subject: [PATCH 10/14] Improve handling of failed SparkContext creation attempts. --- .../scala/org/apache/spark/SparkContext.scala | 98 +++++++++++-------- .../org/apache/spark/SparkContextSuite.scala | 14 +-- 2 files changed, 61 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e21bf4f991650..f0fe4cf4e280e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -66,6 +66,8 @@ import org.apache.spark.util._ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { + SparkContext.markPartiallyConstructed(this, config) + // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It // contains a map from hostname to a list of input format splits on the host. @@ -182,11 +184,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { conf.setIfMissing("spark.driver.host", Utils.localHostName()) conf.setIfMissing("spark.driver.port", "0") - // This is placed after the configuration validation so that common configuration errors, like - // forgetting to pass a master url or app name, don't prevent subsequent SparkContexts from being - // constructed. - SparkContext.verifyUniqueConstruction(conf) - val jars: Seq[String] = conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten @@ -1421,7 +1418,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { persistentRdds.clearOldValues(cleanupTime) } - SparkContext.markFullyConstructed() + SparkContext.markFullyConstructed(this, config) } /** @@ -1431,62 +1428,82 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { object SparkContext extends Logging { /** - * Lock that prevents multiple threads from being in the SparkContext constructor at the same - * time. + * Lock that guards access to global variables that track SparkContext construction. */ private[spark] val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() /** - * Records the creation site of the last SparkContext to successfully enter the constructor. - * This may be an active SparkContext, or a SparkContext that is currently under construction. + * Records the creation site of the active, fully-constructed SparkContext. If no SparkContext + * is active, then this is `None`. * - * Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK */ - private[spark] var activeSparkContextCreationSite: Option[CallSite] = None + private[spark] var activeContextCreationSite: Option[CallSite] = None /** - * Tracks whether `activeSparkContextCreationSite` refers to a fully-constructed SparkContext - * or a partially-constructed one that is either still executing its constructor or threw - * an exception from its constructor. This is used to enable better error-reporting when - * SparkContext construction fails due to existing contexts. + * Points to a partially-constructed SparkContext if some thread is in the SparkContext + * constructor, or `None` if no SparkContext is being constructed. * - * Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK */ - private[spark] var activeContextIsFullyConstructed: Boolean = false + private[spark] var contextBeingConstructed: Option[SparkContext] = None /** - * Called in the SparkContext constructor to ensure that no other SparkContext is running - * in the same JVM. + * Called to ensure that no other SparkContext is running in this JVM. + * + * Throws an exception if a running context is detected and logs a warning if another thread is + * constructing a SparkContext. This warning is necessary because the current locking scheme + * prevents us from reliably distinguishing between cases where another context is being + * constructed and cases where another constructor threw an exception. */ - private[spark] def verifyUniqueConstruction(conf: SparkConf) { + private def assertNoOtherContextIsRunning(sc: SparkContext, conf: SparkConf) { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - activeSparkContextCreationSite.foreach { creationSite => - val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." - val errDetails = if (activeContextIsFullyConstructed) { - s"The currently running SparkContext was created at:\n${creationSite.longForm}" - } else { - s"Another SparkContext is either being constructed or threw an exception from its" + - " constructor; please restart your JVM in order to create a new SparkContext." + - s"The current SparkContext was created at:\n${creationSite.longForm}" + contextBeingConstructed.foreach { otherContext => + if (otherContext ne sc) { + val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" + + " constructor). This may indicate an error, since only one SparkContext may be" + + " running in this JVM (see SPARK-2243)." + logWarning(warnMsg) } - val exception = new SparkException(s"$errMsg $errDetails") - if (conf.getBoolean("spark.driver.allowMultipleContexts", false)) { - logWarning("Multiple running SparkContexts detected in the same JVM!", exception) - } else { - throw exception + + activeContextCreationSite.foreach { creationSite => + val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + + " To ignore this error, set spark.driver.allowMultipleContexts = true. " + + s"The currently running SparkContext was created at:\n${creationSite.longForm}" + val exception = new SparkException(errMsg) + if (conf.getBoolean("spark.driver.allowMultipleContexts", false)) { + logWarning("Multiple running SparkContexts detected in the same JVM!", exception) + } else { + throw exception + } } } - activeSparkContextCreationSite = Some(Utils.getCallSite()) - activeContextIsFullyConstructed = false } } /** - * Called once the SparkContext that's undergoing construction is fully constructed. + * Called at the beginning of the SparkContext constructor to ensure that no SparkContext is + * running. Throws an exception if a running context is detected and logs a warning if another + * thread is constructing a SparkContext. This warning is necessary because the current locking + * scheme prevents us from reliably distinguishing between cases where another context is being + * constructed and cases where another constructor threw an exception. + */ + private[spark] def markPartiallyConstructed(sc: SparkContext, conf: SparkConf) { + SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + assertNoOtherContextIsRunning(sc, conf) + contextBeingConstructed = Some(sc) + } + } + + /** + * Called at the end of the SparkContext constructor to ensure that no other SparkContext has + * raced with this constructor and started. */ - private[spark] def markFullyConstructed() { + private[spark] def markFullyConstructed(sc: SparkContext, conf: SparkConf) { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - activeContextIsFullyConstructed = true + assertNoOtherContextIsRunning(sc, conf) + contextBeingConstructed = None + activeContextCreationSite = Some(Utils.getCallSite()) } } @@ -1497,8 +1514,7 @@ object SparkContext extends Logging { */ private[spark] def clearActiveContext() { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - activeSparkContextCreationSite = None - activeContextIsFullyConstructed = false + activeContextCreationSite = None } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 2a01effe863dc..9e454ddcc52a6 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -51,20 +51,14 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { } } - test("Can still construct a new SparkContext after failing due to missing app name or master") { + test("Can still construct a new SparkContext after failing to construct a previous one") { withSystemProperty("spark.driver.allowMultipleContexts", "false") { - val missingMaster = new SparkConf() - val missingAppName = missingMaster.clone.setMaster("local") - val validConf = missingAppName.clone.setAppName("test") - // We shouldn't be able to construct SparkContexts because these are invalid configurations + // This is an invalid configuration (no app name or master URL) intercept[SparkException] { - new SparkContext(missingMaster) - } - intercept[SparkException] { - new SparkContext(missingAppName) + new SparkContext(new SparkConf()) } // Even though those earlier calls failed, we should still be able to create a new context - sc = new SparkContext(validConf) + sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test")) } } From f5bb78c9db3f10e476b07198fa79385bd8d45a02 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Nov 2014 13:23:53 -0800 Subject: [PATCH 11/14] Update mvn build, too. --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 88ef67c515b3a..08323f649b468 100644 --- a/pom.xml +++ b/pom.xml @@ -965,6 +965,7 @@ ${session.executionRootDirectory} 1 false + true From 85a424a70c6bb63e159fd73d8f35d750a3aa92b1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 14 Nov 2014 23:18:29 -0800 Subject: [PATCH 12/14] Incorporate more review feedback. --- .../scala/org/apache/spark/SparkContext.scala | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f0fe4cf4e280e..725fcea551f39 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -66,8 +66,15 @@ import org.apache.spark.util._ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { + // In order to prevent multiple SparkContexts from being active at the same time, mark this + // context as having started construction SparkContext.markPartiallyConstructed(this, config) + /** + * The call site where this SparkContext was constructed. + */ + private val creationSite: CallSite = Utils.getCallSite() + // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It // contains a map from hostname to a list of input format splits on the host. @@ -1194,7 +1201,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { if (dagScheduler == null) { throw new SparkException("SparkContext has been shutdown") } - val callSite = Utils.getCallSite() + val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, @@ -1418,7 +1425,9 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { persistentRdds.clearOldValues(cleanupTime) } - SparkContext.markFullyConstructed(this, config) + // In order to prevent multiple SparkContexts from being active at the same time, mark this + // context as having finished construction + SparkContext.setActiveContext(this, config) } /** @@ -1430,15 +1439,14 @@ object SparkContext extends Logging { /** * Lock that guards access to global variables that track SparkContext construction. */ - private[spark] val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() + private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() /** - * Records the creation site of the active, fully-constructed SparkContext. If no SparkContext - * is active, then this is `None`. + * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `None`. * * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK */ - private[spark] var activeContextCreationSite: Option[CallSite] = None + private var activeContext: Option[SparkContext] = None /** * Points to a partially-constructed SparkContext if some thread is in the SparkContext @@ -1446,7 +1454,7 @@ object SparkContext extends Logging { * * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK */ - private[spark] var contextBeingConstructed: Option[SparkContext] = None + private var contextBeingConstructed: Option[SparkContext] = None /** * Called to ensure that no other SparkContext is running in this JVM. @@ -1456,20 +1464,20 @@ object SparkContext extends Logging { * prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ - private def assertNoOtherContextIsRunning(sc: SparkContext, conf: SparkConf) { + private def assertNoOtherContextIsRunning(sc: SparkContext, conf: SparkConf): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { contextBeingConstructed.foreach { otherContext => - if (otherContext ne sc) { + if (otherContext ne sc) { // checks for reference equality val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" + " constructor). This may indicate an error, since only one SparkContext may be" + " running in this JVM (see SPARK-2243)." logWarning(warnMsg) } - activeContextCreationSite.foreach { creationSite => + activeContext.foreach { ctx => val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + " To ignore this error, set spark.driver.allowMultipleContexts = true. " + - s"The currently running SparkContext was created at:\n${creationSite.longForm}" + s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}" val exception = new SparkException(errMsg) if (conf.getBoolean("spark.driver.allowMultipleContexts", false)) { logWarning("Multiple running SparkContexts detected in the same JVM!", exception) @@ -1488,7 +1496,7 @@ object SparkContext extends Logging { * scheme prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ - private[spark] def markPartiallyConstructed(sc: SparkContext, conf: SparkConf) { + private[spark] def markPartiallyConstructed(sc: SparkContext, conf: SparkConf): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { assertNoOtherContextIsRunning(sc, conf) contextBeingConstructed = Some(sc) @@ -1499,22 +1507,22 @@ object SparkContext extends Logging { * Called at the end of the SparkContext constructor to ensure that no other SparkContext has * raced with this constructor and started. */ - private[spark] def markFullyConstructed(sc: SparkContext, conf: SparkConf) { + private[spark] def setActiveContext(sc: SparkContext, conf: SparkConf): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { assertNoOtherContextIsRunning(sc, conf) contextBeingConstructed = None - activeContextCreationSite = Some(Utils.getCallSite()) + activeContext = Some(sc) } } /** - * Clears the active SparkContext metadata. This is called by `SparkContext.stop()`. It's + * Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's * also called in unit tests to prevent a flood of warnings from test suites that don't / can't * properly clean up their SparkContexts. */ - private[spark] def clearActiveContext() { + private[spark] def clearActiveContext(): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - activeContextCreationSite = None + activeContext = None } } From c0987d373596284c05189635f992a0828df2e0eb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 14 Nov 2014 23:28:09 -0800 Subject: [PATCH 13/14] Accept boolean instead of SparkConf in methods. --- .../scala/org/apache/spark/SparkContext.scala | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 725fcea551f39..fd5f1e90778d0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -66,14 +66,17 @@ import org.apache.spark.util._ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { + // The call site where this SparkContext was constructed. + private val creationSite: CallSite = Utils.getCallSite() + + // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active + private val allowMultipleContexts: Boolean = + config.getBoolean("spark.driver.allowMultipleContexts", false) + + // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having started construction - SparkContext.markPartiallyConstructed(this, config) - - /** - * The call site where this SparkContext was constructed. - */ - private val creationSite: CallSite = Utils.getCallSite() + SparkContext.markPartiallyConstructed(this, allowMultipleContexts) // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It @@ -1427,7 +1430,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having finished construction - SparkContext.setActiveContext(this, config) + SparkContext.setActiveContext(this, allowMultipleContexts) } /** @@ -1464,7 +1467,9 @@ object SparkContext extends Logging { * prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ - private def assertNoOtherContextIsRunning(sc: SparkContext, conf: SparkConf): Unit = { + private def assertNoOtherContextIsRunning( + sc: SparkContext, + allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { contextBeingConstructed.foreach { otherContext => if (otherContext ne sc) { // checks for reference equality @@ -1479,7 +1484,7 @@ object SparkContext extends Logging { " To ignore this error, set spark.driver.allowMultipleContexts = true. " + s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}" val exception = new SparkException(errMsg) - if (conf.getBoolean("spark.driver.allowMultipleContexts", false)) { + if (allowMultipleContexts) { logWarning("Multiple running SparkContexts detected in the same JVM!", exception) } else { throw exception @@ -1496,9 +1501,11 @@ object SparkContext extends Logging { * scheme prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ - private[spark] def markPartiallyConstructed(sc: SparkContext, conf: SparkConf): Unit = { + private[spark] def markPartiallyConstructed( + sc: SparkContext, + allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - assertNoOtherContextIsRunning(sc, conf) + assertNoOtherContextIsRunning(sc, allowMultipleContexts) contextBeingConstructed = Some(sc) } } @@ -1507,9 +1514,11 @@ object SparkContext extends Logging { * Called at the end of the SparkContext constructor to ensure that no other SparkContext has * raced with this constructor and started. */ - private[spark] def setActiveContext(sc: SparkContext, conf: SparkConf): Unit = { + private[spark] def setActiveContext( + sc: SparkContext, + allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - assertNoOtherContextIsRunning(sc, conf) + assertNoOtherContextIsRunning(sc, allowMultipleContexts) contextBeingConstructed = None activeContext = Some(sc) } From d38251b8c197f8b0aa907525d5bfbe08b047efdb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 16 Nov 2014 21:27:07 -0800 Subject: [PATCH 14/14] Address latest round of feedback. --- .../scala/org/apache/spark/SparkContext.scala | 17 +++++++++++------ .../spark/api/java/JavaSparkContext.scala | 2 +- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fd5f1e90778d0..01d81a6a8f876 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -58,12 +58,11 @@ import org.apache.spark.util._ * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before - * creating a new one. This limitation will eventually be removed; see SPARK-2243 for more details. + * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ - class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // The call site where this SparkContext was constructed. @@ -73,9 +72,9 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false) - // In order to prevent multiple SparkContexts from being active at the same time, mark this - // context as having started construction + // context as having started construction. + // NOTE: this must be placed at the beginning of the SparkContext constructor. SparkContext.markPartiallyConstructed(this, allowMultipleContexts) // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, @@ -1429,7 +1428,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { } // In order to prevent multiple SparkContexts from being active at the same time, mark this - // context as having finished construction + // context as having finished construction. + // NOTE: this must be placed at the end of the SparkContext constructor. SparkContext.setActiveContext(this, allowMultipleContexts) } @@ -1473,9 +1473,14 @@ object SparkContext extends Logging { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { contextBeingConstructed.foreach { otherContext => if (otherContext ne sc) { // checks for reference equality + // Since otherContext might point to a partially-constructed context, guard against + // its creationSite field being null: + val otherContextCreationSite = + Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location") val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" + " constructor). This may indicate an error, since only one SparkContext may be" + - " running in this JVM (see SPARK-2243)." + " running in this JVM (see SPARK-2243)." + + s" The other SparkContext was created at:\n$otherContextCreationSite" logWarning(warnMsg) } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index fbab16630a0d0..365c7d78f4833 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -44,7 +44,7 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. * * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before - * creating a new one. This limitation will eventually be removed; see SPARK-2243 for more details. + * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. */ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround with Closeable {