Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

/**
* A zero-argument function that returns an R.
*/
public interface Function0<R> extends Serializable {
public R call() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ object Checkpoint extends Logging {
}

/** Get checkpoint files present in the give directory, ordered by oldest-first */
def getCheckpointFiles(checkpointDir: String, fs: FileSystem): Seq[Path] = {
def getCheckpointFiles(checkpointDir: String, fsOption: Option[FileSystem] = None): Seq[Path] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems unrelated to this fix. I think we can simply do a null check inside this method and create a FileSystem if needed to avoid unnecessary changes to the calls (all the fs being passed in changing to Some(fs)) -- keeps git history sane.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I added this is so that we should not have to handle nulls. Dealing with nulls is severely frowned upon in Sclaa, and precisely why Option was introduced. There are many places where this has been done, and slowly I was fix those. I think this is a small enough change (doesnt change functionality, or existing code paths) that is okay to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this file has to change anyways in the attempt to make the semantics of read more clear.


def sortFunc(path1: Path, path2: Path): Boolean = {
val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
(time1 < time2) || (time1 == time2 && bk1)
}

val path = new Path(checkpointDir)
val fs = fsOption.getOrElse(path.getFileSystem(new Configuration()))
if (fs.exists(path)) {
val statuses = fs.listStatus(path)
if (statuses != null) {
Expand Down Expand Up @@ -157,7 +159,7 @@ class CheckpointWriter(
}

// Delete old checkpoint files
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs)
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
if (allCheckpointFiles.size > 10) {
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
logInfo("Deleting " + file)
Expand Down Expand Up @@ -229,15 +231,24 @@ class CheckpointWriter(
private[streaming]
object CheckpointReader extends Logging {

def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] =
{
/**
* Read checkpoint files present in the given checkpoint directory. If there are no checkpoint
* files, then return None, else try to return the latest valid checkpoint object. If no
* checkpoint files could be read correctly, then return None (if ignoreReadError = true),
* or throw exception (if ignoreReadError = false).
*/
def read(
checkpointDir: String,
conf: SparkConf,
hadoopConf: Configuration,
ignoreReadError: Boolean = false): Option[Checkpoint] = {
val checkpointPath = new Path(checkpointDir)

// TODO(rxin): Why is this a def?!
def fs = checkpointPath.getFileSystem(hadoopConf)

// Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).reverse
if (checkpointFiles.isEmpty) {
return None
}
Expand Down Expand Up @@ -271,7 +282,10 @@ object CheckpointReader extends Logging {
})

// If none of checkpoint files could be read, then throw exception
throw new SparkException("Failed to read checkpoint from directory " + checkpointPath)
if (!ignoreReadError) {
throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
}
None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ class StreamingContext private[streaming] (
*/
def this(path: String) = this(path, new Configuration)

/**
* Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
* @param path Path to the directory that was specified as the checkpoint directory
* @param sparkContext Existing SparkContext
*/
def this(path: String, sparkContext: SparkContext) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably have scaladoc to explain that it restores from a checkpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Damn, missed that.

this(
sparkContext,
CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).get,
null)
}


if (sc_ == null && cp_ == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
Expand All @@ -115,10 +128,12 @@ class StreamingContext private[streaming] (
private[streaming] val isCheckpointPresent = (cp_ != null)

private[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
if (sc_ != null) {
sc_
} else if (isCheckpointPresent) {
new SparkContext(cp_.createSparkConf())
} else {
sc_
throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
}

Expand All @@ -129,7 +144,7 @@ class StreamingContext private[streaming] (

private[streaming] val conf = sc.conf

private[streaming] val env = SparkEnv.get
private[streaming] val env = sc.env

private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
Expand Down Expand Up @@ -174,7 +189,9 @@ class StreamingContext private[streaming] (

/** Register streaming source to metrics system */
private val streamingSource = new StreamingSource(this)
SparkEnv.get.metricsSystem.registerSource(streamingSource)
assert(env != null)
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)

/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
Expand Down Expand Up @@ -621,19 +638,59 @@ object StreamingContext extends Logging {
hadoopConf: Configuration = new Configuration(),
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = try {
CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf)
} catch {
case e: Exception =>
if (createOnError) {
None
} else {
throw e
}
}
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), hadoopConf, createOnError)
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}


/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
* will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
* that the SparkConf configuration in the checkpoint data will not be restored as the
* SparkContext has already been created.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new StreamingContext using the given SparkContext
* @param sparkContext SparkContext using which the StreamingContext will be created
*/
def getOrCreate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this particular overloaded method? Couldn't you just provide false as a default argument to createOnError in the next line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because there is already a version of getOrCreate with default arguments, and there can be only one overloaded version with default args.

checkpointPath: String,
creatingFunc: SparkContext => StreamingContext,
sparkContext: SparkContext
): StreamingContext = {
getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
* will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
* that the SparkConf configuration in the checkpoint data will not be restored as the
* SparkContext has already been created.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new StreamingContext using the given SparkContext
* @param sparkContext SparkContext using which the StreamingContext will be created
* @param createOnError Whether to create a new StreamingContext if there is an
* error in reading checkpoint data. By default, an exception will be
* thrown on error.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: SparkContext => StreamingContext,
sparkContext: SparkContext,
createOnError: Boolean
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError)
checkpointOption.map(new StreamingContext(sparkContext, _, null))
.getOrElse(creatingFunc(sparkContext))
}

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.function.{Function0 => JFunction0}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.dstream.{PluggableInputDStream, ReceiverInputDStream, DStream}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.hadoop.conf.Configuration

/**
* A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main
Expand Down Expand Up @@ -655,6 +656,7 @@ object JavaStreamingContext {
* @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
*/
@deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: missing a y in "Factory"; same for other annotations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Good catch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs to be fixed on this line!

def getOrCreate(
checkpointPath: String,
factory: JavaStreamingContextFactory
Expand All @@ -676,6 +678,7 @@ object JavaStreamingContext {
* @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
* file system
*/
@deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
def getOrCreate(
checkpointPath: String,
hadoopConf: Configuration,
Expand All @@ -700,6 +703,7 @@ object JavaStreamingContext {
* @param createOnError Whether to create a new JavaStreamingContext if there is an
* error in reading checkpoint data.
*/
@deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
def getOrCreate(
checkpointPath: String,
hadoopConf: Configuration,
Expand All @@ -712,6 +716,117 @@ object JavaStreamingContext {
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction0[JavaStreamingContext]
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
creatingFunc.call().ssc
})
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
* file system
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction0[JavaStreamingContext],
hadoopConf: Configuration
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
creatingFunc.call().ssc
}, hadoopConf)
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
* file system
* @param createOnError Whether to create a new JavaStreamingContext if there is an
* error in reading checkpoint data.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction0[JavaStreamingContext],
hadoopConf: Configuration,
createOnError: Boolean
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
creatingFunc.call().ssc
}, hadoopConf, createOnError)
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param sparkContext SparkContext using which the StreamingContext will be created
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
sparkContext: JavaSparkContext
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
}, sparkContext.sc)
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param sparkContext SparkContext using which the StreamingContext will be created
* @param createOnError Whether to create a new JavaStreamingContext if there is an
* error in reading checkpoint data.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
sparkContext: JavaSparkContext,
createOnError: Boolean
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
}, sparkContext.sc, createOnError)
new JavaStreamingContext(ssc)
}

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
Expand Down
Loading