diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 60ee115e393ce..8be5d53aa7b3c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -79,30 +79,31 @@ class SparkHadoopUtil extends Logging {
* subsystems.
*/
def newConfiguration(conf: SparkConf): Configuration = {
- val hadoopConf = new Configuration()
-
- // Note: this null check is around more than just access to the "conf" object to maintain
- // the behavior of the old implementation of this code, for backwards compatibility.
- if (conf != null) {
- // Explicitly check for S3 environment variables
- if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+ SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ val hadoopConf = new Configuration()
+
+ // Note: this null check is around more than just access to the "conf" object to maintain
+ // the behavior of the old implementation of this code, for backwards compatibility.
+ if (conf != null) {
+ // Explicitly check for S3 environment variables
+ if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
- hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- }
- // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- conf.getAll.foreach { case (key, value) =>
- if (key.startsWith("spark.hadoop.")) {
- hadoopConf.set(key.substring("spark.hadoop.".length), value)
+ hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ }
+ // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
+ conf.getAll.foreach { case (key, value) =>
+ if (key.startsWith("spark.hadoop.")) {
+ hadoopConf.set(key.substring("spark.hadoop.".length), value)
+ }
}
+ val bufferSize = conf.get("spark.buffer.size", "65536")
+ hadoopConf.set("io.file.buffer.size", bufferSize)
}
- val bufferSize = conf.get("spark.buffer.size", "65536")
- hadoopConf.set("io.file.buffer.size", bufferSize)
+ hadoopConf
}
-
- hadoopConf
}
/**
@@ -186,6 +187,11 @@ class SparkHadoopUtil extends Logging {
}
object SparkHadoopUtil {
+ /**
+ * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
+ * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
+ */
+ val CONFIGURATION_INSTANTIATION_LOCK = new Object()
private val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index a157e36e2286e..d6559deefad0e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -144,7 +144,7 @@ class HadoopRDD[K, V](
// clone can be very expensive. To avoid unexpected performance regressions for workloads and
// Hadoop versions that do not suffer from these thread-safety issues, this cloning is
// disabled by default.
- HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Cloning Hadoop Configuration")
val newJobConf = new JobConf(conf)
if (!conf.isInstanceOf[JobConf]) {
@@ -164,7 +164,7 @@ class HadoopRDD[K, V](
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
- HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Creating new JobConf and caching it for later re-use")
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
@@ -322,12 +322,6 @@ class HadoopRDD[K, V](
}
private[spark] object HadoopRDD extends Logging {
- /**
- * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
- * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
- */
- val CONFIGURATION_INSTANTIATION_LOCK = new Object()
-
/** Update the input bytes read metric each time this number of records has been read */
val RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES = 256
diff --git a/docs/configuration.md b/docs/configuration.md
index 64aa94f622afa..c621f2cd4b9c0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -703,6 +703,24 @@ Apart from these, the following properties are also available, and may be useful
this duration will be cleared as well.
+
+ spark.executor.heartbeatInterval |
+ 10000 |
+ Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let
+ the driver know that the executor is still alive and update it with metrics for in-progress
+ tasks. |
+
+
+
+#### Hadoop
+
+
+| Property Name | Default | Meaning |
+
+ spark.hadoop.[HadoopConfigVariable] |
+ (none) |
+ All properties in spark.hadoop.* will be copied into the Hadoop Configuration object used by a SparkContext. For instance, setting spark.hadoop.fs.s3.awsAccessKeyId will set fs.s3.awsAccessKeyId on the Hadoop configuration. |
+
spark.hadoop.validateOutputSpecs |
true |
@@ -720,15 +738,9 @@ Apart from these, the following properties are also available, and may be useful
This is disabled by default in order to avoid unexpected performance regressions for jobs that
are not affected by these issues.
-
- spark.executor.heartbeatInterval |
- 10000 |
- Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let
- the driver know that the executor is still alive and update it with metrics for in-progress
- tasks. |
-
+
#### Networking
| Property Name | Default | Meaning |
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 31cc4170aa867..24c7a6776aefa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
@@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def createParquetFile[A <: Product : TypeTag](
path: String,
allowExisting: Boolean = true,
- conf: Configuration = new Configuration()): SchemaRDD = {
+ conf: Configuration = sparkContext.hadoopConfiguration): SchemaRDD = {
new SchemaRDD(
this,
ParquetRelation.createEmpty(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 4c0869e05b029..b3eef291bd8fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{SQLContext, StructType => SStructType}
import org.apache.spark.sql.catalyst.annotation.SQLUserDefinedType
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
@@ -84,7 +85,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
beanClass: Class[_],
path: String,
allowExisting: Boolean = true,
- conf: Configuration = new Configuration()): JavaSchemaRDD = {
+ conf: Configuration = sqlContext.sparkContext.hadoopConfiguration): JavaSchemaRDD = {
new JavaSchemaRDD(
sqlContext,
ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ecab5510a8e7b..92b879937930a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
@@ -104,7 +105,7 @@ class StreamingContext private[streaming] (
* Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
- def this(path: String) = this(path, new Configuration)
+ def this(path: String) = this(path, SparkHadoopUtil.get.conf)
if (sc_ == null && cp_ == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
@@ -545,7 +546,7 @@ object StreamingContext extends Logging {
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
- hadoopConf: Configuration = new Configuration(),
+ hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = try {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index bb44b906d7386..608b6d7768ebf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
@@ -789,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
- conf: Configuration = new Configuration) {
+ conf: Configuration = dstream.context.sparkContext.hadoopConfiguration) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d8695b8e05962..46b9a620d32e0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
@@ -133,7 +134,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Recreate a JavaStreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
- def this(path: String) = this(new StreamingContext(path, new Configuration))
+ def this(path: String) = this(new StreamingContext(path, SparkHadoopUtil.get.conf))
/**
* Re-creates a JavaStreamingContext from a checkpoint file.