Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ Apart from these, the following properties are also available, and may be useful
The results will be dumped as separated file for each RDD. They can be loaded
by ptats.Stats(). If this is specified, the profile result will not be displayed
automatically.
</td>
</tr>
<tr>
<td><code>spark.python.worker.reuse</code></td>
Expand Down Expand Up @@ -653,6 +654,24 @@ Apart from these, the following properties are also available, and may be useful
this duration will be cleared as well.
</td>
</tr>
<tr>
<td><code>spark.executor.heartbeatInterval</code></td>
<td>10000</td>
<td>Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let
the driver know that the executor is still alive and update it with metrics for in-progress
tasks.</td>
</tr>
</table>

#### Hadoop

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.hadoop.[HadoopConfigVariable]</code></td>
<td>(none)</td>
<td>All properties in spark.hadoop.* will be copied into the Hadoop <code>Configuration</code> object used by a SparkContext. For instance, setting spark.hadoop.fs.s3.awsAccessKeyId will set fs.s3.awsAccessKeyId on the Hadoop configuration.</td>
</tr>
<tr>
<td><code>spark.hadoop.validateOutputSpecs</code></td>
<td>true</td>
Expand All @@ -670,15 +689,9 @@ Apart from these, the following properties are also available, and may be useful
This is disabled by default in order to avoid unexpected performance regressions for jobs that
are not affected by these issues.</td>
</tr>
<tr>
<td><code>spark.executor.heartbeatInterval</code></td>
<td>10000</td>
<td>Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let
the driver know that the executor is still alive and update it with metrics for in-progress
tasks.</td>
</tr>
</table>


#### Networking
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def createParquetFile[A <: Product : TypeTag](
path: String,
allowExisting: Boolean = true,
conf: Configuration = new Configuration()): SchemaRDD = {
conf: Configuration = SparkHadoopUtil.get.conf): SchemaRDD = {
new SchemaRDD(
this,
ParquetRelation.createEmpty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{SQLContext, StructType => SStructType}
import org.apache.spark.sql.catalyst.annotation.SQLUserDefinedType
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
Expand Down Expand Up @@ -84,7 +85,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
beanClass: Class[_],
path: String,
allowExisting: Boolean = true,
conf: Configuration = new Configuration()): JavaSchemaRDD = {
conf: Configuration = SparkHadoopUtil.get.conf): JavaSchemaRDD = {
new JavaSchemaRDD(
sqlContext,
ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
Expand Down Expand Up @@ -104,7 +105,7 @@ class StreamingContext private[streaming] (
* Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
def this(path: String) = this(path, new Configuration)
def this(path: String) = this(path, SparkHadoopUtil.get.conf)

if (sc_ == null && cp_ == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
Expand Down Expand Up @@ -536,7 +537,7 @@ object StreamingContext extends Logging {
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = new Configuration(),
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
Expand Down Expand Up @@ -770,7 +771,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = new Configuration) {
conf: Configuration = SparkHadoopUtil.get.conf) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
Expand Down Expand Up @@ -133,7 +134,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Recreate a JavaStreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
def this(path: String) = this(new StreamingContext(path, new Configuration))
def this(path: String) = this(new StreamingContext(path, SparkHadoopUtil.get.conf))

/**
* Re-creates a JavaStreamingContext from a checkpoint file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{StreamingContext, Time}
Expand Down Expand Up @@ -137,7 +138,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}

private def fs: FileSystem = {
if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration())
if (fs_ == null) fs_ = directoryPath.getFileSystem(SparkHadoopUtil.get.conf)
fs_
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -657,7 +658,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = new Configuration
conf: Configuration = SparkHadoopUtil.get.conf
) {
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
Expand Down