diff --git a/docs/configuration.md b/docs/configuration.md
index 685101ea5c9c..0eca23f94505 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -247,6 +247,7 @@ Apart from these, the following properties are also available, and may be useful
The results will be dumped as separated file for each RDD. They can be loaded
by ptats.Stats(). If this is specified, the profile result will not be displayed
automatically.
+
spark.python.worker.reuse |
@@ -653,6 +654,24 @@ Apart from these, the following properties are also available, and may be useful
this duration will be cleared as well.
+
+ spark.executor.heartbeatInterval |
+ 10000 |
+ Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let
+ the driver know that the executor is still alive and update it with metrics for in-progress
+ tasks. |
+
+
+
+#### Hadoop
+
+
+| Property Name | Default | Meaning |
+
+ spark.hadoop.[HadoopConfigVariable] |
+ (none) |
+ All properties in spark.hadoop.* will be copied into the Hadoop Configuration object used by a SparkContext. For instance, setting spark.hadoop.fs.s3.awsAccessKeyId will set fs.s3.awsAccessKeyId on the Hadoop configuration. |
+
spark.hadoop.validateOutputSpecs |
true |
@@ -670,15 +689,9 @@ Apart from these, the following properties are also available, and may be useful
This is disabled by default in order to avoid unexpected performance regressions for jobs that
are not affected by these issues.
-
- spark.executor.heartbeatInterval |
- 10000 |
- Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let
- the driver know that the executor is still alive and update it with metrics for in-progress
- tasks. |
-
+
#### Networking
| Property Name | Default | Meaning |
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 84eaf401f240..42164797d6e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
@@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def createParquetFile[A <: Product : TypeTag](
path: String,
allowExisting: Boolean = true,
- conf: Configuration = new Configuration()): SchemaRDD = {
+ conf: Configuration = SparkHadoopUtil.get.conf): SchemaRDD = {
new SchemaRDD(
this,
ParquetRelation.createEmpty(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 4c0869e05b02..096e6f59f06b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{SQLContext, StructType => SStructType}
import org.apache.spark.sql.catalyst.annotation.SQLUserDefinedType
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
@@ -84,7 +85,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
beanClass: Class[_],
path: String,
allowExisting: Boolean = true,
- conf: Configuration = new Configuration()): JavaSchemaRDD = {
+ conf: Configuration = SparkHadoopUtil.get.conf): JavaSchemaRDD = {
new JavaSchemaRDD(
sqlContext,
ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 23d6d1c5e50f..599b024c035d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
@@ -104,7 +105,7 @@ class StreamingContext private[streaming] (
* Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
- def this(path: String) = this(path, new Configuration)
+ def this(path: String) = this(path, SparkHadoopUtil.get.conf)
if (sc_ == null && cp_ == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
@@ -536,7 +537,7 @@ object StreamingContext extends Logging {
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
- hadoopConf: Configuration = new Configuration(),
+ hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = try {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 59d4423086ef..6b744b8bbabe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
@@ -770,7 +771,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
- conf: Configuration = new Configuration) {
+ conf: Configuration = SparkHadoopUtil.get.conf) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 7db66c69a6d7..cf17f5ccbc7f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
@@ -133,7 +134,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Recreate a JavaStreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
- def this(path: String) = this(new StreamingContext(path, new Configuration))
+ def this(path: String) = this(new StreamingContext(path, SparkHadoopUtil.get.conf))
/**
* Re-creates a JavaStreamingContext from a checkpoint file.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 8152b7542ac5..05d67e9bc251 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -23,6 +23,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{StreamingContext, Time}
@@ -137,7 +138,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
private def fs: FileSystem = {
- if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration())
+ if (fs_ == null) fs_ = directoryPath.getFileSystem(SparkHadoopUtil.get.conf)
fs_
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 9467595d307a..a6f3748c347c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -21,6 +21,7 @@ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
@@ -657,7 +658,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
- conf: Configuration = new Configuration
+ conf: Configuration = SparkHadoopUtil.get.conf
) {
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)