From 3cd384f77ba9505fe7c94c82980e07044f6b128c Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 4 Nov 2014 16:40:17 -0600 Subject: [PATCH 1/3] SPARK-4229 use SparkHadoopUtil.get.conf so that hadoop properties are copied from spark config --- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 3 ++- .../scala/org/apache/spark/sql/api/java/JavaSQLContext.scala | 3 ++- .../scala/org/apache/spark/streaming/StreamingContext.scala | 5 +++-- .../apache/spark/streaming/api/java/JavaPairDStream.scala | 3 ++- .../spark/streaming/api/java/JavaStreamingContext.scala | 3 ++- .../apache/spark/streaming/dstream/FileInputDStream.scala | 3 ++- .../spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++- 7 files changed, 15 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 84eaf401f240..42164797d6e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis._ @@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def createParquetFile[A <: Product : TypeTag]( path: String, allowExisting: Boolean = true, - conf: Configuration = new Configuration()): SchemaRDD = { + conf: Configuration = SparkHadoopUtil.get.conf): SchemaRDD = { new SchemaRDD( this, ParquetRelation.createEmpty( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 4c0869e05b02..096e6f59f06b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.{SQLContext, StructType => SStructType} import org.apache.spark.sql.catalyst.annotation.SQLUserDefinedType import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow} @@ -84,7 +85,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration { beanClass: Class[_], path: String, allowExisting: Boolean = true, - conf: Configuration = new Configuration()): JavaSchemaRDD = { + conf: Configuration = SparkHadoopUtil.get.conf): JavaSchemaRDD = { new JavaSchemaRDD( sqlContext, ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 23d6d1c5e50f..599b024c035d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ @@ -104,7 +105,7 @@ class StreamingContext private[streaming] ( * Recreate a StreamingContext from a checkpoint file. * @param path Path to the directory that was specified as the checkpoint directory */ - def this(path: String) = this(path, new Configuration) + def this(path: String) = this(path, SparkHadoopUtil.get.conf) if (sc_ == null && cp_ == null) { throw new Exception("Spark Streaming cannot be initialized with " + @@ -536,7 +537,7 @@ object StreamingContext extends Logging { def getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, - hadoopConf: Configuration = new Configuration(), + hadoopConf: Configuration = SparkHadoopUtil.get.conf, createOnError: Boolean = false ): StreamingContext = { val checkpointOption = try { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 59d4423086ef..6b744b8bbabe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -33,6 +33,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaUtils} import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ @@ -770,7 +771,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = new Configuration) { + conf: Configuration = SparkHadoopUtil.get.conf) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 7db66c69a6d7..cf17f5ccbc7f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ @@ -133,7 +134,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Recreate a JavaStreamingContext from a checkpoint file. * @param path Path to the directory that was specified as the checkpoint directory */ - def this(path: String) = this(new StreamingContext(path, new Configuration)) + def this(path: String) = this(new StreamingContext(path, SparkHadoopUtil.get.conf)) /** * Re-creates a JavaStreamingContext from a checkpoint file. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 8152b7542ac5..05d67e9bc251 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -23,6 +23,7 @@ import scala.reflect.ClassTag import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD import org.apache.spark.streaming.{StreamingContext, Time} @@ -137,7 +138,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } private def fs: FileSystem = { - if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration()) + if (fs_ == null) fs_ = directoryPath.getFileSystem(SparkHadoopUtil.get.conf) fs_ } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 9467595d307a..a6f3748c347c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -21,6 +21,7 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.{Partitioner, HashPartitioner} import org.apache.spark.SparkContext._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer @@ -657,7 +658,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = new Configuration + conf: Configuration = SparkHadoopUtil.get.conf ) { val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) From f2ee4f9f1ed717d54fb7916ff2cf3ae85468eab0 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 4 Nov 2014 16:41:07 -0600 Subject: [PATCH 2/3] SPARK-4229 document handling of spark.hadoop.* properties --- docs/configuration.md | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 685101ea5c9c..72a4cc01f455 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -653,6 +653,21 @@ Apart from these, the following properties are also available, and may be useful this duration will be cleared as well. + + spark.executor.heartbeatInterval + 10000 + Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let + the driver know that the executor is still alive and update it with metrics for in-progress + tasks. + + + +#### Hadoop + +All properties in spark.hadoop.* will be copied into the Hadoop Configuration object used by a SparkContext. For instance, setting spark.hadoop.fs.s3.awsAccessKeyId will set fs.s3.awsAccessKeyId on the Hadoop configuration. + + + @@ -670,15 +685,9 @@ Apart from these, the following properties are also available, and may be useful This is disabled by default in order to avoid unexpected performance regressions for jobs that are not affected by these issues. - - - - -
Property NameDefaultMeaning
spark.hadoop.validateOutputSpecs true
spark.executor.heartbeatInterval10000Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let - the driver know that the executor is still alive and update it with metrics for in-progress - tasks.
+ #### Networking From eebbdcc53caa214079612732d3a4a13e57cecffe Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 4 Nov 2014 21:26:26 -0600 Subject: [PATCH 3/3] SPARK-4229 fix broken table in documentation, make hadoop doc formatting match that of runtime env --- docs/configuration.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 72a4cc01f455..0eca23f94505 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -247,6 +247,7 @@ Apart from these, the following properties are also available, and may be useful The results will be dumped as separated file for each RDD. They can be loaded by ptats.Stats(). If this is specified, the profile result will not be displayed automatically. + @@ -664,10 +665,13 @@ Apart from these, the following properties are also available, and may be useful #### Hadoop -All properties in spark.hadoop.* will be copied into the Hadoop Configuration object used by a SparkContext. For instance, setting spark.hadoop.fs.s3.awsAccessKeyId will set fs.s3.awsAccessKeyId on the Hadoop configuration. -
Property NameDefaultMeaning
spark.python.worker.reuse
+ + + + +
Property NameDefaultMeaning
spark.hadoop.[HadoopConfigVariable](none)All properties in spark.hadoop.* will be copied into the Hadoop Configuration object used by a SparkContext. For instance, setting spark.hadoop.fs.s3.awsAccessKeyId will set fs.s3.awsAccessKeyId on the Hadoop configuration.
spark.hadoop.validateOutputSpecs true