Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkException, SparkConf, Logging}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{MetadataCleaner, Utils}
import org.apache.spark.streaming.scheduler.JobGenerator
Expand Down Expand Up @@ -100,7 +101,7 @@ object Checkpoint extends Logging {
}

val path = new Path(checkpointDir)
val fs = fsOption.getOrElse(path.getFileSystem(new Configuration()))
val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
if (fs.exists(path)) {
val statuses = fs.listStatus(path)
if (statuses != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.serializer.SerializationDebugger
Expand Down Expand Up @@ -110,7 +111,7 @@ class StreamingContext private[streaming] (
* Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
def this(path: String) = this(path, new Configuration)
def this(path: String) = this(path, SparkHadoopUtil.get.conf)

/**
* Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
Expand Down Expand Up @@ -803,7 +804,7 @@ object StreamingContext extends Logging {
def getActiveOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = new Configuration(),
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
ACTIVATION_LOCK.synchronized {
Expand All @@ -828,7 +829,7 @@ object StreamingContext extends Logging {
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = new Configuration(),
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration = new Configuration) {
conf: Configuration = dstream.context.sparkContext.hadoopConfiguration) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.function.{Function0 => JFunction0}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
Expand Down Expand Up @@ -136,7 +137,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Recreate a JavaStreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
def this(path: String) = this(new StreamingContext(path, new Configuration))
def this(path: String) = this(new StreamingContext(path, SparkHadoopUtil.get.conf))

/**
* Re-creates a JavaStreamingContext from a checkpoint file.
Expand Down