Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2350,6 +2350,16 @@ object SparkContext extends Logging {
}
}

private[spark] def getActiveContext(): Option[SparkContext] = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
Option(activeContext.get())
}
}

private[spark] def stopActiveContext(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. I'm not a big fan of the approach you're taking here: calling this method before running tests. That feels like a sledgehammer to fix flaky tests. I think it would be better for test code to be more careful about cleaning after itself. Kinda like most tests in spark-core use LocalSparkContext to more or less automatically do that without the need for these methods.

The ReuseableSparkContext trait you have is a step in that direction. If you make sure all needed streaming tests are using it, and keep this state within that class, I think it would be a better change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I don't like stopping SparkContext before running tests, either. It will hide the mistakes in other tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be unnecessary with more carefully written tests? that always close the context etc when done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit that the approach is far from subtle.

It seems that #16105 fixes this (also on my branch). I am closing this for now. Thanks for the feedback.

getActiveContext().foreach(_.stop())
}

/**
* Called at the end of the SparkContext constructor to ensure that no other SparkContext has
* raced with this constructor and started.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext$;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
Expand All @@ -28,6 +29,7 @@ public abstract class LocalJavaStreamingContext {

@Before
public void setUp() {
SparkContext$.MODULE$.stopActiveContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just call SparkContext.stopActiveContext() from Java? I see static methods in the bytecode for things like jarOfObject.

The line is also indented incorrectly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intellij and SBT were both complaining. So I did this.

I'll try to rebuild and see what happens.

SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ class BasicOperationsSuite extends TestSuiteBase {
}

test("slice") {
withStreamingContext(new StreamingContext(conf, Seconds(1))) { ssc =>
withStreamingContext(Seconds(1)) { ssc =>
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
stream.foreachRDD(_ => {}) // Dummy output stream
Expand All @@ -637,7 +637,7 @@ class BasicOperationsSuite extends TestSuiteBase {
}
}
test("slice - has not been initialized") {
withStreamingContext(new StreamingContext(conf, Seconds(1))) { ssc =>
withStreamingContext(Seconds(1)) { ssc =>
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
val thrown = intercept[SparkException] {
Expand All @@ -657,7 +657,7 @@ class BasicOperationsSuite extends TestSuiteBase {
.window(Seconds(4), Seconds(2))
}

val operatedStream = runCleanupTest(conf, operation _,
val operatedStream = runCleanupTest(operation _,
numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3))
val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
Expand Down Expand Up @@ -694,7 +694,7 @@ class BasicOperationsSuite extends TestSuiteBase {
Some(values.sum + state.getOrElse(0))
}
val stateStream = runCleanupTest(
conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))
_.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))

assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
assert(stateStream.generatedRDDs.contains(Time(10000)))
Expand All @@ -705,7 +705,7 @@ class BasicOperationsSuite extends TestSuiteBase {
// Actually receive data over through receiver to create BlockRDDs

withTestServer(new TestServer()) { testServer =>
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext { ssc =>
testServer.start()

val batchCounter = new BatchCounter(ssc)
Expand Down Expand Up @@ -781,7 +781,6 @@ class BasicOperationsSuite extends TestSuiteBase {

/** Test cleanup of RDDs in DStream metadata */
def runCleanupTest[T: ClassTag](
conf2: SparkConf,
operation: DStream[Int] => DStream[T],
numExpectedOutput: Int = cleanupTestInput.size,
rememberDuration: Duration = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
with ResetSystemProperties {

override val reuseContext: Boolean = false

var ssc: StreamingContext = null

override def batchDuration: Duration = Milliseconds(500)
Expand Down Expand Up @@ -238,8 +240,6 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester

assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")

conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")

val stateStreamCheckpointInterval = Seconds(1)
val fs = FileSystem.getLocal(new Configuration())
// this ensure checkpointing occurs at least once
Expand Down Expand Up @@ -571,7 +571,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}

test("recovery maintains rate controller") {
ssc = new StreamingContext(conf, batchDuration)
ssc = new StreamingContext(sc, batchDuration)
ssc.checkpoint(checkpointDir)

val dstream = new RateTestInputDStream(ssc) {
Expand Down Expand Up @@ -635,7 +635,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
try {
// This is a var because it's re-assigned when we restart from a checkpoint
var clock: ManualClock = null
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext(batchDuration) { ssc =>
ssc.checkpoint(checkpointDir)
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val batchCounter = new BatchCounter(ssc)
Expand Down Expand Up @@ -760,7 +760,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}

test("DStreamCheckpointData.restore invoking times") {
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext { ssc =>
ssc.checkpoint(checkpointDir)
val inputDStream = new CheckpointInputDStream(ssc)
val checkpointData = inputDStream.checkpointData
Expand Down Expand Up @@ -822,7 +822,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
val jobGenerator = mock(classOf[JobGenerator])
val checkpointDir = Utils.createTempDir().toString
val checkpointWriter =
new CheckpointWriter(jobGenerator, conf, checkpointDir, new Configuration())
new CheckpointWriter(jobGenerator, sc.conf, checkpointDir, new Configuration())
val bytes1 = Array.fill[Byte](10)(1)
new checkpointWriter.CheckpointWriteHandler(
Time(2000), bytes1, clearCheckpointDataLater = false).run()
Expand Down Expand Up @@ -869,6 +869,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
// Therefore SPARK-6847 introduces "spark.checkpoint.checkpointAllMarked" to force checkpointing
// all marked RDDs in the DAG to resolve this issue. (For the previous example, it will break
// connections between layer 2 and layer 3)
stopActiveContext()
ssc = new StreamingContext(master, framework, batchDuration)
val batchCounter = new BatchCounter(ssc)
ssc.checkpoint(checkpointDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,25 @@ package org.apache.spark.streaming

import java.io.NotSerializableException

import org.scalatest.BeforeAndAfterAll

import org.apache.spark.{HashPartitioner, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.{HashPartitioner, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.ReturnStatementInClosureException

/**
* Test that closures passed to DStream operations are actually cleaned.
*/
class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
private var ssc: StreamingContext = null
class DStreamClosureSuite extends ReuseableSparkContext {
private var ssc: StreamingContext = _

override def beforeAll(): Unit = {
super.beforeAll()
val sc = new SparkContext("local", "test")
ssc = new StreamingContext(sc, Seconds(1))
}

override def afterAll(): Unit = {
try {
ssc.stop(stopSparkContext = true)
ssc.stop()
ssc = null
} finally {
super.afterAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,23 @@ import org.apache.spark.util.ManualClock
/**
* Tests whether scope information is passed from DStream operations to RDDs correctly.
*/
class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterAll {
private var ssc: StreamingContext = null
private val batchDuration: Duration = Seconds(1)
class DStreamScopeSuite extends ReuseableSparkContext {
private var ssc: StreamingContext = _

// Configurations to add to a new or existing spark context.
override def extraSparkConf: Map[String, String] = {
// Use a manual clock
super.extraSparkConf ++ Map("spark.streaming.clock" -> "org.apache.spark.util.ManualClock")
}

override def beforeAll(): Unit = {
super.beforeAll()
val conf = new SparkConf().setMaster("local").setAppName("test")
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
ssc = new StreamingContext(new SparkContext(conf), batchDuration)
ssc = new StreamingContext(sc, Seconds(1))
}

override def afterAll(): Unit = {
try {
ssc.stop(stopSparkContext = true)
ssc.stop()
} finally {
super.afterAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ class FailureSuite extends SparkFunSuite with BeforeAndAfter with Logging {
private val numBatches = 30
private var directory: File = null

override protected def beforeAll(): Unit = {
super.beforeAll()
SparkContext.getActiveContext().foreach(_.stop())
}

before {
directory = Utils.createTempDir()
}
Expand All @@ -46,7 +51,7 @@ class FailureSuite extends SparkFunSuite with BeforeAndAfter with Logging {
StreamingContext.getActive().foreach { _.stop() }

// Stop SparkContext if active
SparkContext.getOrCreate(new SparkConf().setMaster("local").setAppName("bla")).stop()
SparkContext.getActiveContext().foreach(_.stop())
}

test("multiple failures with map") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()

// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext { ssc =>
ssc.addStreamingListener(ssc.progressListener)

val input = Seq(1, 2, 3, 4, 5)
Expand Down Expand Up @@ -112,7 +112,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
withTestServer(new TestServer()) { testServer =>
testServer.start()

withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext { ssc =>
ssc.addStreamingListener(ssc.progressListener)

val batchCounter = new BatchCounter(ssc)
Expand Down Expand Up @@ -149,7 +149,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)

// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext(batchDuration) { ssc =>
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
// This `setTime` call ensures that the clock is past the creation time of `existingFile`
clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
Expand Down Expand Up @@ -213,7 +213,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val pathWithWildCard = testDir.toString + "/*/"

// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext(batchDuration) { ssc =>
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
val batchCounter = new BatchCounter(ssc)
Expand Down Expand Up @@ -270,7 +270,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
def output: Iterable[Long] = outputQueue.asScala.flatMap(x => x)

// set up the network stream using the test receiver
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext { ssc =>
val networkStream = ssc.receiverStream[Int](testReceiver)
val countStream = networkStream.count

Expand Down Expand Up @@ -305,7 +305,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)

// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext { ssc =>
val queue = new mutable.Queue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = true)
val outputStream = new TestOutputStream(queueStream, outputQueue)
Expand Down Expand Up @@ -350,7 +350,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))

// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext { ssc =>
val queue = new mutable.Queue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputStream = new TestOutputStream(queueStream, outputQueue)
Expand Down Expand Up @@ -396,7 +396,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}

test("test track the number of input stream") {
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext { ssc =>

class TestInputDStream extends InputDStream[String](ssc) {
def start() {}
Expand Down Expand Up @@ -434,7 +434,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)

// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
withStreamingContext(batchDuration) { ssc =>
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
// This `setTime` call ensures that the clock is past the creation time of `existingFile`
clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@ import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.PrivateMethodTester._

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.streaming.dstream.{DStream, InternalMapWithStateDStream, MapWithStateDStream, MapWithStateDStreamImpl}
import org.apache.spark.util.{ManualClock, Utils}

class MapWithStateSuite extends SparkFunSuite
with DStreamCheckpointTester with BeforeAndAfterAll with BeforeAndAfter {
class MapWithStateSuite extends ReuseableSparkContext with DStreamCheckpointTester {

private var sc: SparkContext = null
protected var checkpointDir: File = null
protected val batchDuration = Seconds(1)

override def extraSparkConf: Map[String, String] = {
// Use a manual clock
super.extraSparkConf ++ Map("spark.streaming.clock" -> "org.apache.spark.util.ManualClock")
}

before {
StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
checkpointDir = Utils.createTempDir("checkpoint")
Expand All @@ -49,23 +50,6 @@ class MapWithStateSuite extends SparkFunSuite
}
}

override def beforeAll(): Unit = {
super.beforeAll()
val conf = new SparkConf().setMaster("local").setAppName("MapWithStateSuite")
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
sc = new SparkContext(conf)
}

override def afterAll(): Unit = {
try {
if (sc != null) {
sc.stop()
}
} finally {
super.afterAll()
}
}

test("state - get, exists, update, remove, ") {
var state: StateImpl[Int] = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class ReceivedBlockHandlerSuite
var storageLevel: StorageLevel = null
var tempDirectory: File = null

override def beforeAll(): Unit = {
super.beforeAll()
SparkContext.getActiveContext().foreach(_.stop())
}

before {
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
Expand Down Expand Up @@ -107,6 +112,8 @@ class ReceivedBlockHandlerSuite
rpcEnv.awaitTermination()
rpcEnv = null

sc.stop()

Utils.deleteRecursively(tempDirectory)
}

Expand Down
Loading