Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ private[spark] class BinaryFileRDD[T](

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
val conf = getConf
inputFormat match {
case configurable: Configurable =>
configurable.setConf(getConf)
configurable.setConf(conf)
case _ =>
}
val jobContext = newJobContext(getConf, jobId)
val jobContext = newJobContext(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
Expand Down
37 changes: 31 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition(
extends Partition {

val serializableHadoopSplit = new SerializableWritable(rawSplit)

override def hashCode(): Int = 41 * (41 + rddId) + index
}

Expand Down Expand Up @@ -84,6 +83,27 @@ class NewHadoopRDD[K, V](

@transient protected val jobId = new JobID(jobTrackerId, id)

private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)

def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
if (shouldCloneJobConf) {
// Hadoop Configuration objects are not thread-safe, which may lead to various problems if
// one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611). This
// problem occurs somewhat rarely because most jobs treat the configuration as though it's
// immutable. One solution, implemented here, is to clone the Configuration object.
// Unfortunately, this clone can be very expensive. To avoid unexpected performance
// regressions for workloads and Hadoop versions that do not suffer from these thread-safety
// issues, this cloning is disabled by default.
NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Cloning Hadoop Configuration")
new Configuration(conf)
}
} else {
conf
}
}

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
Expand All @@ -104,7 +124,7 @@ class NewHadoopRDD[K, V](
val iter = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
val conf = getConf

val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
Expand Down Expand Up @@ -230,11 +250,15 @@ class NewHadoopRDD[K, V](
super.persist(storageLevel)
}


def getConf: Configuration = confBroadcast.value.value
}

private[spark] object NewHadoopRDD {
/**
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
* Therefore, we synchronize on this lock before calling new Configuration().
*/
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If HADOOP-10456 is still an issue then I think that we need to be locking on the same lock in both NewHadoopRDD and HadoopRDD. Therefore, I'd be in favor of moving this lock to SparkHadoopUtil or a similar location, then updating both RDD implementations to use that lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HADOOP-10456 is fixed in Hadoop 2.4.1 (https://issues.apache.org/jira/browse/HADOOP-10456), so people will still hit the issue in some configurations.

The concurrency issue with the Configuration/JobConf instantiations seems to be per object, not per class. (i.e. happens when two threads try to copy the same conf object, not when two threads try to instantiate Configuration/JobConf with two separate conf objects.) The lock could've even be allocated per NewHadoopRDD instance or HadoopRDD instance.

So, I think this should be safe, although I can certainly move this to SparkHadoopUtil for clarity. Please let me know what you think!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right. In the original version of the fix for SPARK-1097, we actually synchronized on the Configuration being cloned. It looks like the HadoopRDD object lock was added in a followup patch to avoid a deadlock caused by this synchronization: #1409


/**
* Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
* the given function rather than the index of the partition.
Expand Down Expand Up @@ -268,12 +292,13 @@ private[spark] class WholeTextFileRDD(

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
val conf = getConf
inputFormat match {
case configurable: Configurable =>
configurable.setConf(getConf)
configurable.setConf(conf)
case _ =>
}
val jobContext = newJobContext(getConf, jobId)
val jobContext = newJobContext(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
Expand Down