Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class StreamingContext private[streaming] (
if (sc_ != null) {
sc_
} else if (isCheckpointPresent) {
new SparkContext(cp_.createSparkConf())
SparkContext.getOrCreate(cp_.createSparkConf())
} else {
throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
Expand Down Expand Up @@ -750,53 +750,6 @@ object StreamingContext extends Logging {
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
* will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
* that the SparkConf configuration in the checkpoint data will not be restored as the
* SparkContext has already been created.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new StreamingContext using the given SparkContext
* @param sparkContext SparkContext using which the StreamingContext will be created
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: SparkContext => StreamingContext,
sparkContext: SparkContext
): StreamingContext = {
getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
* will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
* that the SparkConf configuration in the checkpoint data will not be restored as the
* SparkContext has already been created.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new StreamingContext using the given SparkContext
* @param sparkContext SparkContext using which the StreamingContext will be created
* @param createOnError Whether to create a new StreamingContext if there is an
* error in reading checkpoint data. By default, an exception will be
* thrown on error.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: SparkContext => StreamingContext,
sparkContext: SparkContext,
createOnError: Boolean
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError)
checkpointOption.map(new StreamingContext(sparkContext, _, null))
.getOrElse(creatingFunc(sparkContext))
}

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,51 +804,6 @@ object JavaStreamingContext {
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param sparkContext SparkContext using which the StreamingContext will be created
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
sparkContext: JavaSparkContext
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
}, sparkContext.sc)
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param sparkContext SparkContext using which the StreamingContext will be created
* @param createOnError Whether to create a new JavaStreamingContext if there is an
* error in reading checkpoint data.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
sparkContext: JavaSparkContext,
createOnError: Boolean
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
}, sparkContext.sc, createOnError)
new JavaStreamingContext(ssc)
}

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1766,29 +1766,10 @@ public JavaStreamingContext call() {
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();

// Function to create JavaStreamingContext using existing JavaSparkContext
// without any output operations (used to detect the new context)
Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
new Function<JavaSparkContext, JavaStreamingContext>() {
public JavaStreamingContext call(JavaSparkContext context) {
newContextCreated.set(true);
return new JavaStreamingContext(context, Seconds.apply(1));
}
};

JavaSparkContext sc = new JavaSparkContext(conf);
newContextCreated.set(false);
ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
Assert.assertTrue("new context not created", newContextCreated.get());
ssc.stop(false);

newContextCreated.set(false);
ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
Assert.assertTrue("new context not created", newContextCreated.get());
ssc.stop(false);

newContextCreated.set(false);
ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
JavaSparkContext sc = new JavaSparkContext(conf);
ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
new org.apache.hadoop.conf.Configuration());
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,76 +419,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(ssc.conf.get("someKey") === "someValue")
}
}

test("getOrCreate with existing SparkContext") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
sc = new SparkContext(conf)

// Function to create StreamingContext that has a config to identify it to be new context
var newContextCreated = false
def creatingFunction(sparkContext: SparkContext): StreamingContext = {
newContextCreated = true
new StreamingContext(sparkContext, batchDuration)
}

// Call ssc.stop(stopSparkContext = false) after a body of cody
def testGetOrCreate(body: => Unit): Unit = {
newContextCreated = false
try {
body
} finally {
if (ssc != null) {
ssc.stop(stopSparkContext = false)
}
ssc = null
}
}

val emptyPath = Utils.createTempDir().getAbsolutePath()

// getOrCreate should create new context with empty path
testGetOrCreate {
ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
}

val corrutedCheckpointPath = createCorruptedCheckpoint()

// getOrCreate should throw exception with fake checkpoint file and createOnError = false
intercept[Exception] {
ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc)
}

// getOrCreate should throw exception with fake checkpoint file
intercept[Exception] {
ssc = StreamingContext.getOrCreate(
corrutedCheckpointPath, creatingFunction _, sc, createOnError = false)
}

// getOrCreate should create new context with fake checkpoint file and createOnError = true
testGetOrCreate {
ssc = StreamingContext.getOrCreate(
corrutedCheckpointPath, creatingFunction _, sc, createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
}

val checkpointPath = createValidCheckpoint()

// StreamingContext.getOrCreate should recover context with checkpoint path
// getOrCreate should recover StreamingContext with existing SparkContext
testGetOrCreate {
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc)
sc = new SparkContext(conf)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the whole unit test to test the deleted API has been replaced by a sub-unit-test that tests older API's ability to use an existing SparkContext.

ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
assert(!ssc.conf.contains("someKey"),
"recovered StreamingContext unexpectedly has old config")
assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
}
}

Expand Down