Skip to content

Commit 9307f56

Browse files
koeningertdas
authored andcommitted
[SPARK-9472] [STREAMING] consistent hadoop configuration, streaming only
Author: cody koeninger <[email protected]> Closes apache#7772 from koeninger/streaming-hadoop-config and squashes the following commits: 5267284 [cody koeninger] [SPARK-4229][Streaming] consistent hadoop configuration, streaming only
1 parent 3c66ff7 commit 9307f56

File tree

4 files changed

+9
-6
lines changed

4 files changed

+9
-6
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2525
import org.apache.hadoop.conf.Configuration
2626

2727
import org.apache.spark.{SparkException, SparkConf, Logging}
28+
import org.apache.spark.deploy.SparkHadoopUtil
2829
import org.apache.spark.io.CompressionCodec
2930
import org.apache.spark.util.{MetadataCleaner, Utils}
3031
import org.apache.spark.streaming.scheduler.JobGenerator
@@ -100,7 +101,7 @@ object Checkpoint extends Logging {
100101
}
101102

102103
val path = new Path(checkpointDir)
103-
val fs = fsOption.getOrElse(path.getFileSystem(new Configuration()))
104+
val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
104105
if (fs.exists(path)) {
105106
val statuses = fs.listStatus(path)
106107
if (statuses != null) {

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3434

3535
import org.apache.spark._
3636
import org.apache.spark.annotation.{DeveloperApi, Experimental}
37+
import org.apache.spark.deploy.SparkHadoopUtil
3738
import org.apache.spark.input.FixedLengthBinaryInputFormat
3839
import org.apache.spark.rdd.{RDD, RDDOperationScope}
3940
import org.apache.spark.serializer.SerializationDebugger
@@ -110,7 +111,7 @@ class StreamingContext private[streaming] (
110111
* Recreate a StreamingContext from a checkpoint file.
111112
* @param path Path to the directory that was specified as the checkpoint directory
112113
*/
113-
def this(path: String) = this(path, new Configuration)
114+
def this(path: String) = this(path, SparkHadoopUtil.get.conf)
114115

115116
/**
116117
* Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
@@ -803,7 +804,7 @@ object StreamingContext extends Logging {
803804
def getActiveOrCreate(
804805
checkpointPath: String,
805806
creatingFunc: () => StreamingContext,
806-
hadoopConf: Configuration = new Configuration(),
807+
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
807808
createOnError: Boolean = false
808809
): StreamingContext = {
809810
ACTIVATION_LOCK.synchronized {
@@ -828,7 +829,7 @@ object StreamingContext extends Logging {
828829
def getOrCreate(
829830
checkpointPath: String,
830831
creatingFunc: () => StreamingContext,
831-
hadoopConf: Configuration = new Configuration(),
832+
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
832833
createOnError: Boolean = false
833834
): StreamingContext = {
834835
val checkpointOption = CheckpointReader.read(

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
788788
keyClass: Class[_],
789789
valueClass: Class[_],
790790
outputFormatClass: Class[F],
791-
conf: Configuration = new Configuration) {
791+
conf: Configuration = dstream.context.sparkContext.hadoopConfiguration) {
792792
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
793793
}
794794

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.annotation.Experimental
3333
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
3434
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
3535
import org.apache.spark.api.java.function.{Function0 => JFunction0}
36+
import org.apache.spark.deploy.SparkHadoopUtil
3637
import org.apache.spark.rdd.RDD
3738
import org.apache.spark.storage.StorageLevel
3839
import org.apache.spark.streaming._
@@ -136,7 +137,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
136137
* Recreate a JavaStreamingContext from a checkpoint file.
137138
* @param path Path to the directory that was specified as the checkpoint directory
138139
*/
139-
def this(path: String) = this(new StreamingContext(path, new Configuration))
140+
def this(path: String) = this(new StreamingContext(path, SparkHadoopUtil.get.conf))
140141

141142
/**
142143
* Re-creates a JavaStreamingContext from a checkpoint file.

0 commit comments

Comments
 (0)