Skip to content

Commit bce00da

Browse files
committed
[SPARK-6752] [STREAMING] [REVISED] Allow StreamingContext to be recreated from checkpoint and existing SparkContext
This is a revision of the earlier version (see #5773) that passed the active SparkContext explicitly through a new set of Java and Scala API. The drawbacks are. * Hard to implement in python. * New API introduced. This is even more confusing since we are introducing getActiveOrCreate in SPARK-7553 Furthermore, there is now a direct way get an existing active SparkContext or create a new on - SparkContext.getOrCreate(conf). Its better to use this to get the SparkContext rather than have a new API to explicitly pass the context. So in this PR I have * Removed the new versions of StreamingContext.getOrCreate() which took SparkContext * Added the ability to pick up existing SparkContext when the StreamingContext tries to create a SparkContext. Author: Tathagata Das <[email protected]> Closes #6096 from tdas/SPARK-6752 and squashes the following commits: 53f4b2d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752 f024b77 [Tathagata Das] Removed extra API and used SparkContext.getOrCreate
1 parent 59aaa1d commit bce00da

File tree

4 files changed

+9
-180
lines changed

4 files changed

+9
-180
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ class StreamingContext private[streaming] (
134134
if (sc_ != null) {
135135
sc_
136136
} else if (isCheckpointPresent) {
137-
new SparkContext(cp_.createSparkConf())
137+
SparkContext.getOrCreate(cp_.createSparkConf())
138138
} else {
139139
throw new SparkException("Cannot create StreamingContext without a SparkContext")
140140
}
@@ -750,53 +750,6 @@ object StreamingContext extends Logging {
750750
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
751751
}
752752

753-
/**
754-
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
755-
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
756-
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
757-
* will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
758-
* that the SparkConf configuration in the checkpoint data will not be restored as the
759-
* SparkContext has already been created.
760-
*
761-
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
762-
* @param creatingFunc Function to create a new StreamingContext using the given SparkContext
763-
* @param sparkContext SparkContext using which the StreamingContext will be created
764-
*/
765-
def getOrCreate(
766-
checkpointPath: String,
767-
creatingFunc: SparkContext => StreamingContext,
768-
sparkContext: SparkContext
769-
): StreamingContext = {
770-
getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false)
771-
}
772-
773-
/**
774-
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
775-
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
776-
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
777-
* will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
778-
* that the SparkConf configuration in the checkpoint data will not be restored as the
779-
* SparkContext has already been created.
780-
*
781-
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
782-
* @param creatingFunc Function to create a new StreamingContext using the given SparkContext
783-
* @param sparkContext SparkContext using which the StreamingContext will be created
784-
* @param createOnError Whether to create a new StreamingContext if there is an
785-
* error in reading checkpoint data. By default, an exception will be
786-
* thrown on error.
787-
*/
788-
def getOrCreate(
789-
checkpointPath: String,
790-
creatingFunc: SparkContext => StreamingContext,
791-
sparkContext: SparkContext,
792-
createOnError: Boolean
793-
): StreamingContext = {
794-
val checkpointOption = CheckpointReader.read(
795-
checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError)
796-
checkpointOption.map(new StreamingContext(sparkContext, _, null))
797-
.getOrElse(creatingFunc(sparkContext))
798-
}
799-
800753
/**
801754
* Find the JAR from which a given class was loaded, to make it easy for users to pass
802755
* their JARs to StreamingContext.

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -804,51 +804,6 @@ object JavaStreamingContext {
804804
new JavaStreamingContext(ssc)
805805
}
806806

807-
/**
808-
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
809-
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
810-
* recreated from the checkpoint data. If the data does not exist, then the provided factory
811-
* will be used to create a JavaStreamingContext.
812-
*
813-
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
814-
* @param creatingFunc Function to create a new JavaStreamingContext
815-
* @param sparkContext SparkContext using which the StreamingContext will be created
816-
*/
817-
def getOrCreate(
818-
checkpointPath: String,
819-
creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
820-
sparkContext: JavaSparkContext
821-
): JavaStreamingContext = {
822-
val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
823-
creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
824-
}, sparkContext.sc)
825-
new JavaStreamingContext(ssc)
826-
}
827-
828-
/**
829-
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
830-
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
831-
* recreated from the checkpoint data. If the data does not exist, then the provided factory
832-
* will be used to create a JavaStreamingContext.
833-
*
834-
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
835-
* @param creatingFunc Function to create a new JavaStreamingContext
836-
* @param sparkContext SparkContext using which the StreamingContext will be created
837-
* @param createOnError Whether to create a new JavaStreamingContext if there is an
838-
* error in reading checkpoint data.
839-
*/
840-
def getOrCreate(
841-
checkpointPath: String,
842-
creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
843-
sparkContext: JavaSparkContext,
844-
createOnError: Boolean
845-
): JavaStreamingContext = {
846-
val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
847-
creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
848-
}, sparkContext.sc, createOnError)
849-
new JavaStreamingContext(ssc)
850-
}
851-
852807
/**
853808
* Find the JAR from which a given class was loaded, to make it easy for users to pass
854809
* their JARs to StreamingContext.

streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1766,29 +1766,10 @@ public JavaStreamingContext call() {
17661766
Assert.assertTrue("old context not recovered", !newContextCreated.get());
17671767
ssc.stop();
17681768

1769-
// Function to create JavaStreamingContext using existing JavaSparkContext
1770-
// without any output operations (used to detect the new context)
1771-
Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
1772-
new Function<JavaSparkContext, JavaStreamingContext>() {
1773-
public JavaStreamingContext call(JavaSparkContext context) {
1774-
newContextCreated.set(true);
1775-
return new JavaStreamingContext(context, Seconds.apply(1));
1776-
}
1777-
};
1778-
1779-
JavaSparkContext sc = new JavaSparkContext(conf);
1780-
newContextCreated.set(false);
1781-
ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
1782-
Assert.assertTrue("new context not created", newContextCreated.get());
1783-
ssc.stop(false);
1784-
17851769
newContextCreated.set(false);
1786-
ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
1787-
Assert.assertTrue("new context not created", newContextCreated.get());
1788-
ssc.stop(false);
1789-
1790-
newContextCreated.set(false);
1791-
ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
1770+
JavaSparkContext sc = new JavaSparkContext(conf);
1771+
ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
1772+
new org.apache.hadoop.conf.Configuration());
17921773
Assert.assertTrue("old context not recovered", !newContextCreated.get());
17931774
ssc.stop();
17941775
}

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 5 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -419,76 +419,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
419419
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
420420
assert(ssc != null, "no context created")
421421
assert(!newContextCreated, "old context not recovered")
422-
assert(ssc.conf.get("someKey") === "someValue")
423-
}
424-
}
425-
426-
test("getOrCreate with existing SparkContext") {
427-
val conf = new SparkConf().setMaster(master).setAppName(appName)
428-
sc = new SparkContext(conf)
429-
430-
// Function to create StreamingContext that has a config to identify it to be new context
431-
var newContextCreated = false
432-
def creatingFunction(sparkContext: SparkContext): StreamingContext = {
433-
newContextCreated = true
434-
new StreamingContext(sparkContext, batchDuration)
435-
}
436-
437-
// Call ssc.stop(stopSparkContext = false) after a body of cody
438-
def testGetOrCreate(body: => Unit): Unit = {
439-
newContextCreated = false
440-
try {
441-
body
442-
} finally {
443-
if (ssc != null) {
444-
ssc.stop(stopSparkContext = false)
445-
}
446-
ssc = null
447-
}
448-
}
449-
450-
val emptyPath = Utils.createTempDir().getAbsolutePath()
451-
452-
// getOrCreate should create new context with empty path
453-
testGetOrCreate {
454-
ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true)
455-
assert(ssc != null, "no context created")
456-
assert(newContextCreated, "new context not created")
457-
assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
422+
assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
458423
}
459424

460-
val corrutedCheckpointPath = createCorruptedCheckpoint()
461-
462-
// getOrCreate should throw exception with fake checkpoint file and createOnError = false
463-
intercept[Exception] {
464-
ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc)
465-
}
466-
467-
// getOrCreate should throw exception with fake checkpoint file
468-
intercept[Exception] {
469-
ssc = StreamingContext.getOrCreate(
470-
corrutedCheckpointPath, creatingFunction _, sc, createOnError = false)
471-
}
472-
473-
// getOrCreate should create new context with fake checkpoint file and createOnError = true
474-
testGetOrCreate {
475-
ssc = StreamingContext.getOrCreate(
476-
corrutedCheckpointPath, creatingFunction _, sc, createOnError = true)
477-
assert(ssc != null, "no context created")
478-
assert(newContextCreated, "new context not created")
479-
assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
480-
}
481-
482-
val checkpointPath = createValidCheckpoint()
483-
484-
// StreamingContext.getOrCreate should recover context with checkpoint path
425+
// getOrCreate should recover StreamingContext with existing SparkContext
485426
testGetOrCreate {
486-
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc)
427+
sc = new SparkContext(conf)
428+
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
487429
assert(ssc != null, "no context created")
488430
assert(!newContextCreated, "old context not recovered")
489-
assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
490-
assert(!ssc.conf.contains("someKey"),
491-
"recovered StreamingContext unexpectedly has old config")
431+
assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
492432
}
493433
}
494434

0 commit comments

Comments
 (0)