diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index 55cb25946c2ad..404b07492ad82 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -19,9 +19,9 @@ package org.apache.spark import java.io._ -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.ObjectWritable import org.apache.hadoop.io.Writable +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils @@ -39,8 +39,10 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() val ow = new ObjectWritable() - ow.setConf(new Configuration()) - ow.readFields(in) + SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized { + ow.setConf(SparkHadoopUtil.newConfiguration()) + ow.readFields(in) // not thread safe + } t = ow.get().asInstanceOf[T] } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e8fdfff04390d..0ccd2f5bb271d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -565,12 +565,10 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { valueClass: Class[V], minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { - // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. - val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, - confBroadcast, + hadoopConfiguration, Some(setInputPathsFunc), inputFormatClass, keyClass, diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 49dc95f349eac..0f0bb087908c1 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -61,7 +61,7 @@ private[python] object Converter extends Logging { * Other objects are passed through without conversion. */ private[python] class WritableToJavaConverter( - conf: Broadcast[SerializableWritable[Configuration]], + conf: SerializableWritable[Configuration], batchSize: Int) extends Converter[Any, Any] { /** @@ -95,7 +95,7 @@ private[python] class WritableToJavaConverter( } map case w: Writable => - if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w + if (batchSize > 1) WritableUtils.clone(w, conf.value) else w case other => other } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 163dca6cade5a..27c2a05e260e2 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -441,9 +441,8 @@ private[spark] object PythonRDD extends Logging { val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]] val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]] val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits) - val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration())) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, - new WritableToJavaConverter(confBroadcasted, batchSize)) + new WritableToJavaConverter(new SerializableWritable(sc.hadoopConfiguration()), batchSize)) JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } @@ -467,9 +466,8 @@ private[spark] object PythonRDD extends Logging { val rdd = newAPIHadoopRDDFromClassNames[K, V, F](sc, Some(path), inputFormatClass, keyClass, valueClass, mergedConf) - val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf)) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, - new WritableToJavaConverter(confBroadcasted, batchSize)) + new WritableToJavaConverter(new SerializableWritable(mergedConf), batchSize)) JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } @@ -493,9 +491,8 @@ private[spark] object PythonRDD extends Logging { val rdd = newAPIHadoopRDDFromClassNames[K, V, F](sc, None, inputFormatClass, keyClass, valueClass, conf) - val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf)) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, - new WritableToJavaConverter(confBroadcasted, batchSize)) + new WritableToJavaConverter(new SerializableWritable(conf), batchSize)) JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } @@ -536,9 +533,8 @@ private[spark] object PythonRDD extends Logging { val rdd = hadoopRDDFromClassNames[K, V, F](sc, Some(path), inputFormatClass, keyClass, valueClass, mergedConf) - val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf)) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, - new WritableToJavaConverter(confBroadcasted, batchSize)) + new WritableToJavaConverter(new SerializableWritable(mergedConf), batchSize)) JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } @@ -562,9 +558,8 @@ private[spark] object PythonRDD extends Logging { val rdd = hadoopRDDFromClassNames[K, V, F](sc, None, inputFormatClass, keyClass, valueClass, conf) - val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf)) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, - new WritableToJavaConverter(confBroadcasted, batchSize)) + new WritableToJavaConverter(new SerializableWritable(conf), batchSize)) JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e28eaad8a5180..5acb039c7d677 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -154,6 +154,20 @@ class SparkHadoopUtil extends Logging { } object SparkHadoopUtil { + /** + * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456). + * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration(). + */ + private[spark] val CONFIGURATION_INSTANTIATION_LOCK = new Object() + + /** + * Create a new Configuration in thread-safe way + */ + private[spark] def newConfiguration(): Configuration = { + CONFIGURATION_INSTANTIATION_LOCK.synchronized { + new Configuration() + } + } private val hadoop = { val yarnMode = java.lang.Boolean.valueOf( diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 7ba1182f0ed27..39cf0683337a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark._ -import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -37,7 +36,8 @@ private[spark] class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) extends RDD[T](sc, Nil) { - val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration)) + // Serializable configuration + private val sConf = new SerializableWritable(sc.hadoopConfiguration) @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) @@ -71,7 +71,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) override def compute(split: Partition, context: TaskContext): Iterator[T] = { val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)) - CheckpointRDD.readFromFile(file, broadcastedConf, context) + CheckpointRDD.readFromFile(file, sConf.value, context) } override def checkpoint() { @@ -86,12 +86,12 @@ private[spark] object CheckpointRDD extends Logging { def writeToFile[T: ClassTag]( path: String, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], + conf: SerializableWritable[Configuration], blockSize: Int = -1 )(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(broadcastedConf.value.value) + val fs = outputDir.getFileSystem(conf.value) val finalOutputName = splitIdToFile(ctx.partitionId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -130,11 +130,11 @@ private[spark] object CheckpointRDD extends Logging { def readFromFile[T]( path: Path, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], + conf: Configuration, context: TaskContext ): Iterator[T] = { val env = SparkEnv.get - val fs = path.getFileSystem(broadcastedConf.value.value) + val fs = path.getFileSystem(conf) val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() @@ -159,8 +159,8 @@ private[spark] object CheckpointRDD extends Logging { val path = new Path(hdfsPath, "temp") val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf()) val fs = path.getFileSystem(conf) - val broadcastedConf = sc.broadcast(new SerializableWritable(conf)) - sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _) + sc.runJob(rdd, CheckpointRDD.writeToFile[Int]( + path.toString, new SerializableWritable(conf), 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same") diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 946fb5616d3ec..31271d935d7fe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -39,7 +39,6 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD @@ -85,7 +84,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * [[org.apache.spark.SparkContext.hadoopRDD()]] * * @param sc The SparkContext to associate the RDD with. - * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed + * @param conf A general Hadoop Configuration, or a subclass of it. If the enclosed * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD @@ -98,7 +97,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp @DeveloperApi class HadoopRDD[K, V]( sc: SparkContext, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], + @transient conf: Configuration, initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], @@ -106,6 +105,9 @@ class HadoopRDD[K, V]( minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging { + // The serializable configuration + private val sConf = new SerializableWritable(conf) + def this( sc: SparkContext, conf: JobConf, @@ -115,8 +117,7 @@ class HadoopRDD[K, V]( minPartitions: Int) = { this( sc, - sc.broadcast(new SerializableWritable(conf)) - .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + conf, None /* initLocalJobConfFuncOpt */, inputFormatClass, keyClass, @@ -124,55 +125,19 @@ class HadoopRDD[K, V]( minPartitions) } - protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) - protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) // used to build JobTracker ID private val createTime = new Date() - private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", "false").toBoolean - // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf(): JobConf = { - val conf: Configuration = broadcastedConf.value.value - if (shouldCloneJobConf) { - // Hadoop Configuration objects are not thread-safe, which may lead to various problems if - // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs - // somewhat rarely because most jobs treat the configuration as though it's immutable. One - // solution, implemented here, is to clone the Configuration object. Unfortunately, this - // clone can be very expensive. To avoid unexpected performance regressions for workloads and - // Hadoop versions that do not suffer from these thread-safety issues, this cloning is - // disabled by default. - HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { - logDebug("Cloning Hadoop Configuration") - val newJobConf = new JobConf(conf) - if (!conf.isInstanceOf[JobConf]) { - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - } + protected def getJobConf(): JobConf = sConf.value match { + case jobConf: JobConf => jobConf + case c => SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK synchronized { + val newJobConf = new JobConf(c) + initLocalJobConfFuncOpt.map(f => f(newJobConf)) newJobConf } - } else { - if (conf.isInstanceOf[JobConf]) { - logDebug("Re-using user-broadcasted JobConf") - conf.asInstanceOf[JobConf] - } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { - logDebug("Re-using cached JobConf") - HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] - } else { - // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the - // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). - // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456). - HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { - logDebug("Creating new JobConf and caching it for later re-use") - val newJobConf = new JobConf(conf) - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf - } - } - } } protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { @@ -319,12 +284,6 @@ class HadoopRDD[K, V]( } private[spark] object HadoopRDD extends Logging { - /** - * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456). - * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration(). - */ - val CONFIGURATION_INSTANTIATION_LOCK = new Object() - /** Update the input bytes read metric each time this number of records has been read */ val RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES = 256 diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 324563248793c..0e69a1b2739e6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -75,9 +75,7 @@ class NewHadoopRDD[K, V]( with SparkHadoopMapReduceUtil with Logging { - // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it - private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) - // private val serializableConf = new SerializableWritable(conf) + private val sConf = new SerializableWritable(conf) private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") @@ -106,7 +104,7 @@ class NewHadoopRDD[K, V]( val iter = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) - val conf = confBroadcast.value.value + val conf = sConf.value val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance @@ -220,7 +218,7 @@ class NewHadoopRDD[K, V]( locs.getOrElse(split.getLocations.filter(_ != "localhost")) } - def getConf: Configuration = confBroadcast.value.value + def getConf: Configuration = sConf.value } private[spark] object NewHadoopRDD { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala index f67e5f1857979..8a989559a2d4f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala @@ -90,9 +90,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) } // Save to file, and reload it as an RDD - val broadcastedConf = rdd.context.broadcast( - new SerializableWritable(rdd.context.hadoopConfiguration)) - rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _) + val sConf = new SerializableWritable(rdd.context.hadoopConfiguration) + rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, sConf) _) val newRDD = new CheckpointRDD[T](rdd.context, path.toString) if (newRDD.partitions.size != rdd.partitions.size) { throw new SparkException( diff --git a/docs/configuration.md b/docs/configuration.md index 3007706a2586e..5c61937932a89 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -639,15 +639,6 @@ Apart from these, the following properties are also available, and may be useful output directories. We recommend that users do not disable this except if trying to achieve compatibility with previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand. -
spark.hadoop.cloneConfConfiguration object for each task. This
- option should be enabled to work around Configuration thread-safety issues (see
- SPARK-2546 for more details).
- This is disabled by default in order to avoid unexpected performance regressions for jobs that
- are not affected by these issues.spark.executor.heartbeatInterval