@@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition(
4444 extends Partition {
4545
4646 val serializableHadoopSplit = new SerializableWritable (rawSplit)
47-
4847 override def hashCode (): Int = 41 * (41 + rddId) + index
4948}
5049
@@ -84,6 +83,27 @@ class NewHadoopRDD[K, V](
8483
8584 @ transient protected val jobId = new JobID (jobTrackerId, id)
8685
86+ private val shouldCloneJobConf = sparkContext.conf.getBoolean(" spark.hadoop.cloneConf" , false )
87+
88+ def getConf : Configuration = {
89+ val conf : Configuration = confBroadcast.value.value
90+ if (shouldCloneJobConf) {
91+ // Hadoop Configuration objects are not thread-safe, which may lead to various problems if
92+ // one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611). This
93+ // problem occurs somewhat rarely because most jobs treat the configuration as though it's
94+ // immutable. One solution, implemented here, is to clone the Configuration object.
95+ // Unfortunately, this clone can be very expensive. To avoid unexpected performance
96+ // regressions for workloads and Hadoop versions that do not suffer from these thread-safety
97+ // issues, this cloning is disabled by default.
98+ NewHadoopRDD .CONFIGURATION_INSTANTIATION_LOCK .synchronized {
99+ logDebug(" Cloning Hadoop Configuration" )
100+ new Configuration (conf)
101+ }
102+ } else {
103+ conf
104+ }
105+ }
106+
87107 override def getPartitions : Array [Partition ] = {
88108 val inputFormat = inputFormatClass.newInstance
89109 inputFormat match {
@@ -104,7 +124,7 @@ class NewHadoopRDD[K, V](
104124 val iter = new Iterator [(K , V )] {
105125 val split = theSplit.asInstanceOf [NewHadoopPartition ]
106126 logInfo(" Input split: " + split.serializableHadoopSplit)
107- val conf = confBroadcast.value.value
127+ val conf = getConf
108128
109129 val inputMetrics = context.taskMetrics
110130 .getInputMetricsForReadMethod(DataReadMethod .Hadoop )
@@ -230,11 +250,15 @@ class NewHadoopRDD[K, V](
230250 super .persist(storageLevel)
231251 }
232252
233-
234- def getConf : Configuration = confBroadcast.value.value
235253}
236254
237255private [spark] object NewHadoopRDD {
256+ /**
257+ * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
258+ * Therefore, we synchronize on this lock before calling new Configuration().
259+ */
260+ val CONFIGURATION_INSTANTIATION_LOCK = new Object ()
261+
238262 /**
239263 * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD ]], but passes in an InputSplit to
240264 * the given function rather than the index of the partition.
@@ -268,6 +292,7 @@ private[spark] class WholeTextFileRDD(
268292
269293 override def getPartitions : Array [Partition ] = {
270294 val inputFormat = inputFormatClass.newInstance
295+ val conf = getConf
271296 inputFormat match {
272297 case configurable : Configurable =>
273298 configurable.setConf(conf)
0 commit comments