From 96fe274beeffa003df56dac724a6d475641d7383 Mon Sep 17 00:00:00 2001 From: comcmipi Date: Thu, 9 Oct 2014 17:00:03 +0200 Subject: [PATCH 1/2] Update RecoverableNetworkWordCount.scala Trying this example, I missed the moment when the checkpoint was iniciated --- .../spark/examples/streaming/RecoverableNetworkWordCount.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 6af3a0f33efc2..04e41b9d02e92 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -116,6 +116,7 @@ object RecoverableNetworkWordCount { () => { createContext(ip, port, outputPath) }) + ssc.checkpoint(checkpointDirectory) ssc.start() ssc.awaitTermination() } From b6d800148cb80c2bd2c43dca5b78af962a89fcb5 Mon Sep 17 00:00:00 2001 From: comcmipi Date: Sat, 8 Nov 2014 10:59:09 +0100 Subject: [PATCH 2/2] Update RecoverableNetworkWordCount.scala Ok, I've added ssc.checkpoint(checkpointDirectory) to createContext. First, I wasn't sure that the checkpoin is initiated when the context is recreated from checkpoinDirector. That's why I put it outside createContext. --- .../examples/streaming/RecoverableNetworkWordCount.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 04e41b9d02e92..d90a84d794d72 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -69,7 +69,7 @@ import org.apache.spark.util.IntParam object RecoverableNetworkWordCount { - def createContext(ip: String, port: Int, outputPath: String) = { + def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint @@ -79,6 +79,7 @@ object RecoverableNetworkWordCount { val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) + ssc.checkpoint(checkpointDirectory) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') @@ -114,9 +115,8 @@ object RecoverableNetworkWordCount { val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { - createContext(ip, port, outputPath) + createContext(ip, port, outputPath, checkpointDirectory) }) - ssc.checkpoint(checkpointDirectory) ssc.start() ssc.awaitTermination() }