From 19fc6daca078a7c08fe283ed983d9525a75f8cb3 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 29 Jul 2014 21:39:14 -0700 Subject: [PATCH 1/3] [SPARK-2585] Remove special handling of Hadoop JobConf. --- .../scala/org/apache/spark/SparkContext.scala | 7 +- .../org/apache/spark/rdd/HadoopRDD.scala | 70 +++++-------------- .../org/apache/spark/rdd/NewHadoopRDD.scala | 8 +-- .../apache/spark/sql/hive/TableReader.scala | 26 ++----- 4 files changed, 31 insertions(+), 80 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fb4c86716bb8..2d8ce2e8132f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -553,16 +553,15 @@ class SparkContext(config: SparkConf) extends Logging { minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. - val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, - confBroadcast, - Some(setInputPathsFunc), + hadoopConfiguration, inputFormatClass, keyClass, valueClass, - minPartitions).setName(path) + minPartitions, + Some(setInputPathsFunc)).setName(path) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index e521612ffc27..af3d065c03ac 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -80,44 +80,27 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * [[org.apache.spark.SparkContext.hadoopRDD()]] * * @param sc The SparkContext to associate the RDD with. - * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed - * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. - * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. - * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD - * creates. + * @param conf A general Hadoop Configuration, or a subclass of it. If the enclosed variable + * references an instance of JobConf, then that JobConf will be used for the Hadoop job. + * Otherwise, a new JobConf will be created using the enclosed Configuration. * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. * @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate. + * @param initLocalJobConfFuncOpt Optional closure used to initialize a JobConf. */ @DeveloperApi class HadoopRDD[K, V]( sc: SparkContext, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], - initLocalJobConfFuncOpt: Option[JobConf => Unit], + @transient conf: Configuration, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int) + minPartitions: Int, + initLocalJobConfFuncOpt: Option[JobConf => Unit] = None) extends RDD[(K, V)](sc, Nil) with Logging { - def this( - sc: SparkContext, - conf: JobConf, - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minPartitions: Int) = { - this( - sc, - sc.broadcast(new SerializableWritable(conf)) - .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], - None /* initLocalJobConfFuncOpt */, - inputFormatClass, - keyClass, - valueClass, - minPartitions) - } + private val serializableConf = new SerializableWritable(conf) protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) @@ -127,26 +110,15 @@ class HadoopRDD[K, V]( private val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf(): JobConf = { - val conf: Configuration = broadcastedConf.value.value - if (conf.isInstanceOf[JobConf]) { - // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. - conf.asInstanceOf[JobConf] - } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { - // getJobConf() has been called previously, so there is already a local cache of the JobConf - // needed by this RDD. - HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] - } else { - // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the - // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). - // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - // Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456). - HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { - val newJobConf = new JobConf(conf) - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf - } + protected def createJobConf(): JobConf = { + val conf: Configuration = serializableConf.value + conf match { + case jobConf: JobConf => + jobConf + case _: Configuration => + val jobConf = new JobConf(conf) + initLocalJobConfFuncOpt.foreach(f => f(jobConf)) + jobConf } } @@ -166,7 +138,7 @@ class HadoopRDD[K, V]( } override def getPartitions: Array[Partition] = { - val jobConf = getJobConf() + val jobConf = createJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) @@ -187,7 +159,7 @@ class HadoopRDD[K, V]( val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null - val jobConf = getJobConf() + val jobConf = createJobConf() val inputFormat = getInputFormat(jobConf) HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), context.stageId, theSplit.index, context.attemptId.toInt, jobConf) @@ -241,13 +213,9 @@ class HadoopRDD[K, V]( override def checkpoint() { // Do nothing. Hadoop RDD should not be checkpointed. } - - def getConf: Configuration = getJobConf() } private[spark] object HadoopRDD { - /** Constructing Configuration objects is not threadsafe, use this lock to serialize. */ - val CONFIGURATION_INSTANTIATION_LOCK = new Object() /** * The three methods below are helpers for accessing the local map, a property of the SparkEnv of diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index f2b3a64bf134..39ef9bfc5fed 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -69,9 +69,7 @@ class NewHadoopRDD[K, V]( with SparkHadoopMapReduceUtil with Logging { - // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it - private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) - // private val serializableConf = new SerializableWritable(conf) + private val serializableConf = new SerializableWritable(conf) private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") @@ -100,7 +98,7 @@ class NewHadoopRDD[K, V]( val iter = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) - val conf = confBroadcast.value.value + val conf = serializableConf.value val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance @@ -161,8 +159,6 @@ class NewHadoopRDD[K, V]( val theSplit = split.asInstanceOf[NewHadoopPartition] theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost") } - - def getConf: Configuration = confBroadcast.value.value } private[spark] class WholeTextFileRDD( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 82c88280d775..df5f9728d490 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities @@ -30,11 +29,9 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} import org.apache.spark.SerializableWritable -import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions.{Attribute, Row, GenericMutableRow, Literal, Cast} -import org.apache.spark.sql.catalyst.types.DataType /** * A trait for subclasses that handle table scans. @@ -64,13 +61,6 @@ class HadoopTableReader( // TODO: set aws s3 credentials. - private val _broadcastedHiveConf = - sc.sparkContext.broadcast(new SerializableWritable(sc.hiveconf)) - - def broadcastedHiveConf = _broadcastedHiveConf - - def hiveConf = _broadcastedHiveConf.value.value - override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] = makeRDDForTable( hiveTable, @@ -97,7 +87,7 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf + val hiveconfWrapper = new SerializableWritable(sc.hiveconf) val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) @@ -110,9 +100,8 @@ class HadoopTableReader( val attrsWithIndex = attributes.zipWithIndex val mutableRow = new GenericMutableRow(attrsWithIndex.length) val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, tableDesc.getProperties) + deserializer.initialize(hiveconfWrapper.value, tableDesc.getProperties) HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow) } @@ -161,8 +150,8 @@ class HadoopTableReader( } // Create local references so that the outer object isn't serialized. + val hiveconfWrapper = new SerializableWritable(sc.hiveconf) val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf val localDeserializer = partDeserializer val mutableRow = new GenericMutableRow(attributes.length) @@ -185,9 +174,8 @@ class HadoopTableReader( val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) hivePartitionRDD.mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value val deserializer = localDeserializer.newInstance() - deserializer.initialize(hconf, partProps) + deserializer.initialize(hiveconfWrapper.value, partProps) // fill the non partition key attributes HadoopTableReader.fillObject(iter, deserializer, attrs, mutableRow) @@ -229,12 +217,12 @@ class HadoopTableReader( val rdd = new HadoopRDD( sc.sparkContext, - _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]], - Some(initializeJobConfFunc), + sc.hiveconf, inputFormatClass, classOf[Writable], classOf[Writable], - _minSplitsPerRDD) + _minSplitsPerRDD, + Some(initializeJobConfFunc)) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) From 7abe8d6a1b81fd82a538c91b004b0e3ddf53e395 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 30 Jul 2014 16:27:45 -0700 Subject: [PATCH 2/3] Remove JobConf broadcast comment. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2d8ce2e8132f..c7c19b0dbe35 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -533,7 +533,7 @@ class SparkContext(config: SparkConf) extends Logging { valueClass: Class[V], minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { - // Add necessary security credentials to the JobConf before broadcasting it. + // Add necessary security credentials to the JobConf. SparkHadoopUtil.get.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions) } @@ -552,7 +552,6 @@ class SparkContext(config: SparkConf) extends Logging { valueClass: Class[V], minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { - // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, From a56005bf6892a7aec2da61cc24d2a1681fd3f471 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 4 Aug 2014 18:39:02 -0700 Subject: [PATCH 3/3] Set the number of partitions to 2. --- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 4fef07116171..c558a652aa34 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.util.TimeZone +import org.apache.spark.sql.SQLConf import org.scalatest.BeforeAndAfter import org.apache.spark.sql.hive.test.TestHive @@ -38,6 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { override def beforeAll() { TestHive.cacheTables = true + TestHive.set(SQLConf.SHUFFLE_PARTITIONS, "2") // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) originalTimeZone = TimeZone.getDefault TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))