From c41a4b4d0752b1a5b057611c796e367c5a806be6 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 4 Nov 2014 16:40:17 -0600 Subject: [PATCH 1/4] SPARK-4229 use SparkHadoopUtil.get.conf so that hadoop properties are copied from spark config Resolved conflicts in favor of master. Conflicts: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala --- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 3 ++- .../scala/org/apache/spark/sql/api/java/JavaSQLContext.scala | 3 ++- .../scala/org/apache/spark/streaming/StreamingContext.scala | 5 +++-- .../apache/spark/streaming/api/java/JavaPairDStream.scala | 3 ++- .../spark/streaming/api/java/JavaStreamingContext.scala | 3 ++- 5 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 31cc4170aa867..5f509e4cbac13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis._ @@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def createParquetFile[A <: Product : TypeTag]( path: String, allowExisting: Boolean = true, - conf: Configuration = new Configuration()): SchemaRDD = { + conf: Configuration = SparkHadoopUtil.get.conf): SchemaRDD = { new SchemaRDD( this, ParquetRelation.createEmpty( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 4c0869e05b029..096e6f59f06b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.{SQLContext, StructType => SStructType} import org.apache.spark.sql.catalyst.annotation.SQLUserDefinedType import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow} @@ -84,7 +85,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration { beanClass: Class[_], path: String, allowExisting: Boolean = true, - conf: Configuration = new Configuration()): JavaSchemaRDD = { + conf: Configuration = SparkHadoopUtil.get.conf): JavaSchemaRDD = { new JavaSchemaRDD( sqlContext, ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ecab5510a8e7b..92b879937930a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ @@ -104,7 +105,7 @@ class StreamingContext private[streaming] ( * Recreate a StreamingContext from a checkpoint file. * @param path Path to the directory that was specified as the checkpoint directory */ - def this(path: String) = this(path, new Configuration) + def this(path: String) = this(path, SparkHadoopUtil.get.conf) if (sc_ == null && cp_ == null) { throw new Exception("Spark Streaming cannot be initialized with " + @@ -545,7 +546,7 @@ object StreamingContext extends Logging { def getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, - hadoopConf: Configuration = new Configuration(), + hadoopConf: Configuration = SparkHadoopUtil.get.conf, createOnError: Boolean = false ): StreamingContext = { val checkpointOption = try { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index bb44b906d7386..c4ff005abad6f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -33,6 +33,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaUtils} import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ @@ -789,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = new Configuration) { + conf: Configuration = SparkHadoopUtil.get.conf) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index d8695b8e05962..46b9a620d32e0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ @@ -133,7 +134,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Recreate a JavaStreamingContext from a checkpoint file. * @param path Path to the directory that was specified as the checkpoint directory */ - def this(path: String) = this(new StreamingContext(path, new Configuration)) + def this(path: String) = this(new StreamingContext(path, SparkHadoopUtil.get.conf)) /** * Re-creates a JavaStreamingContext from a checkpoint file. From b48ad63fb9c31de90c8b5b0541129e2c71bd3478 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 4 Nov 2014 16:41:07 -0600 Subject: [PATCH 2/4] SPARK-4229 document handling of spark.hadoop.* properties --- docs/configuration.md | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 0b77f5ab645c9..8346bbe082a9c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -663,6 +663,21 @@ Apart from these, the following properties are also available, and may be useful this duration will be cleared as well. + + spark.executor.heartbeatInterval + 10000 + Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let + the driver know that the executor is still alive and update it with metrics for in-progress + tasks. + + + +#### Hadoop + +All properties in spark.hadoop.* will be copied into the Hadoop Configuration object used by a SparkContext. For instance, setting spark.hadoop.fs.s3.awsAccessKeyId will set fs.s3.awsAccessKeyId on the Hadoop configuration. + + + @@ -680,15 +695,9 @@ Apart from these, the following properties are also available, and may be useful This is disabled by default in order to avoid unexpected performance regressions for jobs that are not affected by these issues. - - - - -
Property NameDefaultMeaning
spark.hadoop.validateOutputSpecs true
spark.executor.heartbeatInterval10000Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let - the driver know that the executor is still alive and update it with metrics for in-progress - tasks.
+ #### Networking From 413f916bafc5b218ab334cb9d66b67f3dbc117f7 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 4 Nov 2014 21:26:26 -0600 Subject: [PATCH 3/4] SPARK-4229 fix broken table in documentation, make hadoop doc formatting match that of runtime env --- docs/configuration.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 8346bbe082a9c..a809bf352ab1d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -254,6 +254,7 @@ Apart from these, the following properties are also available, and may be useful The results will be dumped as separated file for each RDD. They can be loaded by ptats.Stats(). If this is specified, the profile result will not be displayed automatically. + @@ -674,10 +675,13 @@ Apart from these, the following properties are also available, and may be useful #### Hadoop -All properties in spark.hadoop.* will be copied into the Hadoop Configuration object used by a SparkContext. For instance, setting spark.hadoop.fs.s3.awsAccessKeyId will set fs.s3.awsAccessKeyId on the Hadoop configuration. -
Property NameDefaultMeaning
spark.python.worker.reuse
+ + + + + From bfc550ef0b7b535adb0aa019f30dd4771c24aece Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 16 Dec 2014 16:49:41 -0600 Subject: [PATCH 4/4] [SPARK-4229] per tdas and JoshRosen, use sparkContext.hadoopConfiguration where possible, synchronize new Configuration --- .../apache/spark/deploy/SparkHadoopUtil.scala | 46 +++++++++++-------- .../org/apache/spark/rdd/HadoopRDD.scala | 10 +--- .../org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/api/java/JavaSQLContext.scala | 2 +- .../streaming/api/java/JavaPairDStream.scala | 2 +- 5 files changed, 31 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 60ee115e393ce..8be5d53aa7b3c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -79,30 +79,31 @@ class SparkHadoopUtil extends Logging { * subsystems. */ def newConfiguration(conf: SparkConf): Configuration = { - val hadoopConf = new Configuration() - - // Note: this null check is around more than just access to the "conf" object to maintain - // the behavior of the old implementation of this code, for backwards compatibility. - if (conf != null) { - // Explicitly check for S3 environment variables - if (System.getenv("AWS_ACCESS_KEY_ID") != null && + SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized { + val hadoopConf = new Configuration() + + // Note: this null check is around more than just access to the "conf" object to maintain + // the behavior of the old implementation of this code, for backwards compatibility. + if (conf != null) { + // Explicitly check for S3 environment variables + if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { - hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) - hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) - hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) - hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) - } - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - conf.getAll.foreach { case (key, value) => - if (key.startsWith("spark.hadoop.")) { - hadoopConf.set(key.substring("spark.hadoop.".length), value) + hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) + hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) + hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) + hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) + } + // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" + conf.getAll.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + hadoopConf.set(key.substring("spark.hadoop.".length), value) + } } + val bufferSize = conf.get("spark.buffer.size", "65536") + hadoopConf.set("io.file.buffer.size", bufferSize) } - val bufferSize = conf.get("spark.buffer.size", "65536") - hadoopConf.set("io.file.buffer.size", bufferSize) + hadoopConf } - - hadoopConf } /** @@ -186,6 +187,11 @@ class SparkHadoopUtil extends Logging { } object SparkHadoopUtil { + /** + * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456). + * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration(). + */ + val CONFIGURATION_INSTANTIATION_LOCK = new Object() private val hadoop = { val yarnMode = java.lang.Boolean.valueOf( diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index a157e36e2286e..d6559deefad0e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -144,7 +144,7 @@ class HadoopRDD[K, V]( // clone can be very expensive. To avoid unexpected performance regressions for workloads and // Hadoop versions that do not suffer from these thread-safety issues, this cloning is // disabled by default. - HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { + SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized { logDebug("Cloning Hadoop Configuration") val newJobConf = new JobConf(conf) if (!conf.isInstanceOf[JobConf]) { @@ -164,7 +164,7 @@ class HadoopRDD[K, V]( // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456). - HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { + SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized { logDebug("Creating new JobConf and caching it for later re-use") val newJobConf = new JobConf(conf) initLocalJobConfFuncOpt.map(f => f(newJobConf)) @@ -322,12 +322,6 @@ class HadoopRDD[K, V]( } private[spark] object HadoopRDD extends Logging { - /** - * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456). - * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration(). - */ - val CONFIGURATION_INSTANTIATION_LOCK = new Object() - /** Update the input bytes read metric each time this number of records has been read */ val RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES = 256 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5f509e4cbac13..24c7a6776aefa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -263,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def createParquetFile[A <: Product : TypeTag]( path: String, allowExisting: Boolean = true, - conf: Configuration = SparkHadoopUtil.get.conf): SchemaRDD = { + conf: Configuration = sparkContext.hadoopConfiguration): SchemaRDD = { new SchemaRDD( this, ParquetRelation.createEmpty( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 096e6f59f06b7..b3eef291bd8fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -85,7 +85,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration { beanClass: Class[_], path: String, allowExisting: Boolean = true, - conf: Configuration = SparkHadoopUtil.get.conf): JavaSchemaRDD = { + conf: Configuration = sqlContext.sparkContext.hadoopConfiguration): JavaSchemaRDD = { new JavaSchemaRDD( sqlContext, ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index c4ff005abad6f..608b6d7768ebf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -790,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = SparkHadoopUtil.get.conf) { + conf: Configuration = dstream.context.sparkContext.hadoopConfiguration) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) }
Property NameDefaultMeaning
spark.hadoop.[HadoopConfigVariable](none)All properties in spark.hadoop.* will be copied into the Hadoop Configuration object used by a SparkContext. For instance, setting spark.hadoop.fs.s3.awsAccessKeyId will set fs.s3.awsAccessKeyId on the Hadoop configuration.
spark.hadoop.validateOutputSpecs true