Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,31 @@ class SparkHadoopUtil extends Logging {
* subsystems.
*/
def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new Configuration()

// Note: this null check is around more than just access to the "conf" object to maintain
// the behavior of the old implementation of this code, for backwards compatibility.
if (conf != null) {
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
val hadoopConf = new Configuration()

// Note: this null check is around more than just access to the "conf" object to maintain
// the behavior of the old implementation of this code, for backwards compatibility.
if (conf != null) {
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}

hadoopConf
}

/**
Expand Down Expand Up @@ -186,6 +187,11 @@ class SparkHadoopUtil extends Logging {
}

object SparkHadoopUtil {
/**
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
* Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
*/
val CONFIGURATION_INSTANTIATION_LOCK = new Object()

private val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(
Expand Down
10 changes: 2 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class HadoopRDD[K, V](
// clone can be very expensive. To avoid unexpected performance regressions for workloads and
// Hadoop versions that do not suffer from these thread-safety issues, this cloning is
// disabled by default.
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Cloning Hadoop Configuration")
val newJobConf = new JobConf(conf)
if (!conf.isInstanceOf[JobConf]) {
Expand All @@ -164,7 +164,7 @@ class HadoopRDD[K, V](
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Creating new JobConf and caching it for later re-use")
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
Expand Down Expand Up @@ -322,12 +322,6 @@ class HadoopRDD[K, V](
}

private[spark] object HadoopRDD extends Logging {
/**
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
* Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
*/
val CONFIGURATION_INSTANTIATION_LOCK = new Object()

/** Update the input bytes read metric each time this number of records has been read */
val RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES = 256

Expand Down
26 changes: 19 additions & 7 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,24 @@ Apart from these, the following properties are also available, and may be useful
this duration will be cleared as well.
</td>
</tr>
<tr>
<td><code>spark.executor.heartbeatInterval</code></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure that's just diff getting confused based on where the hadoop doc changes were inserted, same lines are marked as removed lower in the diff

<td>10000</td>
<td>Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let
the driver know that the executor is still alive and update it with metrics for in-progress
tasks.</td>
</tr>
</table>

#### Hadoop

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.hadoop.[HadoopConfigVariable]</code></td>
<td>(none)</td>
<td>All properties in spark.hadoop.* will be copied into the Hadoop <code>Configuration</code> object used by a SparkContext. For instance, setting spark.hadoop.fs.s3.awsAccessKeyId will set fs.s3.awsAccessKeyId on the Hadoop configuration.</td>
</tr>
<tr>
<td><code>spark.hadoop.validateOutputSpecs</code></td>
<td>true</td>
Expand All @@ -720,15 +738,9 @@ Apart from these, the following properties are also available, and may be useful
This is disabled by default in order to avoid unexpected performance regressions for jobs that
are not affected by these issues.</td>
</tr>
<tr>
<td><code>spark.executor.heartbeatInterval</code></td>
<td>10000</td>
<td>Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let
the driver know that the executor is still alive and update it with metrics for in-progress
tasks.</td>
</tr>
</table>


#### Networking
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def createParquetFile[A <: Product : TypeTag](
path: String,
allowExisting: Boolean = true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be using the "hadoopConfiguration" object in the SparkContext. That has all the hadoop related configuration already setup and should be what is automatically used. @marmbrus should have a better idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to recall there being potential thread safety issues related to
hadoop configuration objects, resulting in the need to create / clone them.

Quick search turned up e.g.

https://issues.apache.org/jira/browse/SPARK-2546

I'm not sure how relevant that is to all of these existing situations where
new Configuration() is being called.

On Tue, Dec 9, 2014 at 5:07 PM, Tathagata Das [email protected]
wrote:

In sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
#3543 (diff):

@@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def createParquetFile[A <: Product : TypeTag](
path: String,
allowExisting: Boolean = true,

  •  conf: Configuration = new Configuration()): SchemaRDD = {
    

I think this should be using the "hadoopConfiguration" object in the
SparkContext. That has all the hadoop related configuration already setup
and should be what is automatically used. @marmbrus
https://github.com/marmbrus should have a better idea.


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3543/files#r21571141.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@koeninger The issue that you linked is concerned with thread-safety issues when multiple threads concurrently modify the same Configuration instance.

It turns out that there's another, older thread-safety issue related to Configuration's constructor not being thread-safe due to non-thread-safe static state: https://issues.apache.org/jira/browse/HADOOP-10456. This has been fixed in some newer Hadoop releases, but since it was only reported in April I don't think we can ignore it. As a result, https://issues.apache.org/jira/browse/SPARK-1097 implements a workaround which synchronizes on an object before calling new Configuration. Currently, I think the extra synchronization logic is only implemented in HadoopRDD, but it should probably be used everywhere just to be safe. I think that HadoopRDD was the "highest-risk" place where we might have many threads creating Configurations at the same time, which is probably why that patch's author didn't add the synchronization everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let me see if I have things straight

  • Currently, the code is using new Configuration() as a default, which may have some thread safety issues due to the constructor
  • my original patch uses SparkHadoopUtil.get.conf, which is a singleton, so should decrease the constructor thread safety problem, but increase the problems if the hadoop configuration is modified. It also won't do the right thing for people who have altered the sparkConf, which makes it no good (I haven't run into this in personal usage of the patched version, because I always pass in a complete sparkConf via properties rather than setting it in code)
  • @tdas suggested to use this.sparkContext.hadoopConfiguration. This will use the "right" spark config, but may have thread safety issues both at construction the time the spark context is created, and if the configuration is modified.

So....

Use tdas' suggestion, add a HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized block to SparkHadoopUtil.newConfiguration? And people are out of luck if they have code that used to work because they were modifying new blank instances of Configuration, rather than the now-shared one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to use CONFIGURATION_INSTANTIATION_LOCK in multiple places, then I think it makes sense to move CONFIGURATION_INSTANTIATION_LOCK into SparkHadoopUtil, since that seems like a more logical place for it to live than HadoopRDD. I like the idea of hiding the synchronization logic behind a method like SparkHadoopUtil.newConfiguration.

Regarding whether SparkContext.hadoopConfiguration will lead to thread-safety issues: I did a bit of research on this while developing a workaround for the other configuration thread-safety issues and wrote a series of comments citing cases of code "in the wild" that depend on mutating SparkContext.hadoopConfiguration. For example, there are a lot of snippets of code that look like this:

sc.hadoopConfiguration.set("es.resource", "syslog/entry")
output.saveAsHadoopFile[ESOutputFormat]("-")

In Spark 1.x, I don't think we'll be able to safely transition away from using the shared SparkContext.hadoopConfiguration instance since there's so much existing code that relies on the current behavior.

However, I think that there's much less risk of running into thread-safety issues as a result of this. It seems fairly unlikely that you'll have multiple threads mutating the shared configuration in the driver JVM. In executor JVMs, most Hadoop InputFormats (and other classes) don't mutate configurations, so we shouldn't run into issues; for those that do mutate, users can always enable the cloneConf setting.

In a nutshell, I don't think that the shared sc.hadoopConfiguration is a good design that we would choose if we were redesigning it, but using it here seems consistent with the behavior that we have elsewhere in Spark as long as we're stuck with this for 1.x.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And people are out of luck if they have code that used to work because they were modifying new blank instances of Configuration, rather than the now-shared one?

I don't think that users were able to access the old new Configuration() instance; I think that the only code that could have modified this would be the Parquet code.

conf: Configuration = new Configuration()): SchemaRDD = {
conf: Configuration = sparkContext.hadoopConfiguration): SchemaRDD = {
new SchemaRDD(
this,
ParquetRelation.createEmpty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{SQLContext, StructType => SStructType}
import org.apache.spark.sql.catalyst.annotation.SQLUserDefinedType
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
Expand Down Expand Up @@ -84,7 +85,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
beanClass: Class[_],
path: String,
allowExisting: Boolean = true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as I made in SQLContext

conf: Configuration = new Configuration()): JavaSchemaRDD = {
conf: Configuration = sqlContext.sparkContext.hadoopConfiguration): JavaSchemaRDD = {
new JavaSchemaRDD(
sqlContext,
ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
Expand Down Expand Up @@ -104,7 +105,7 @@ class StreamingContext private[streaming] (
* Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
def this(path: String) = this(path, new Configuration)
def this(path: String) = this(path, SparkHadoopUtil.get.conf)

if (sc_ == null && cp_ == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
Expand Down Expand Up @@ -545,7 +546,7 @@ object StreamingContext extends Logging {
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approve this change.

hadoopConf: Configuration = new Configuration(),
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
Expand Down Expand Up @@ -789,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be the configuration from the sparkContext.hadoopConfiguration

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scope of this PR is pretty wide in terms of the number of classes it touches, causing issues as different places needs to be handled differently. If you considered moving this sort of changes (new Configuration to sparkContext.hadoopConfiguration) into a different PR that might be easier to get in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on what Marcelo Vanzin said on the dev list when I brought this issue
up, the only reason the problem was still around for me to run into is
because he changed some of the uses of new Configuration but not all of
them.

I agree it's used in a lot of different places, but I'm not sure how
piecemeal fixes to only some of the places is helpful to users. Were there
still specific concerns about particular classes?

On Sun, Jan 4, 2015 at 6:28 AM, Tathagata Das [email protected]
wrote:

In
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
#3543 (diff):

@@ -789,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: DStream[%28K, V%29])(
keyClass: Class[],
valueClass: Class[
],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],

  •  conf: Configuration = new Configuration) {
    

The scope of this PR is pretty wide in terms of the number of classes it
touches, causing issues as different places needs to be handled
differently. If you considered moving this sort of changes (new
Configuration to sparkContext.hadoopConfiguration) into a different PR
that might be easier to get in.


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3543/files#r22438855.

conf: Configuration = new Configuration) {
conf: Configuration = dstream.context.sparkContext.hadoopConfiguration) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
Expand Down Expand Up @@ -133,7 +134,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Recreate a JavaStreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
def this(path: String) = this(new StreamingContext(path, new Configuration))
def this(path: String) = this(new StreamingContext(path, SparkHadoopUtil.get.conf))

/**
* Re-creates a JavaStreamingContext from a checkpoint file.
Expand Down