Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ class SparkContext(config: SparkConf) extends Logging {
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
// Add necessary security credentials to the JobConf.
SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
}
Expand All @@ -552,17 +552,15 @@ class SparkContext(config: SparkConf) extends Logging {
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
hadoopConfiguration,
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
minPartitions,
Some(setInputPathsFunc)).setName(path)
}

/**
Expand Down
70 changes: 19 additions & 51 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,44 +80,27 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* [[org.apache.spark.SparkContext.hadoopRDD()]]
*
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
* creates.
* @param conf A general Hadoop Configuration, or a subclass of it. If the enclosed variable
* references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created using the enclosed Configuration.
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate.
* @param initLocalJobConfFuncOpt Optional closure used to initialize a JobConf.
*/
@DeveloperApi
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
@transient conf: Configuration,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
minPartitions: Int,
initLocalJobConfFuncOpt: Option[JobConf => Unit] = None)
extends RDD[(K, V)](sc, Nil) with Logging {

def this(
sc: SparkContext,
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
None /* initLocalJobConfFuncOpt */,
inputFormatClass,
keyClass,
valueClass,
minPartitions)
}
private val serializableConf = new SerializableWritable(conf)

protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)

Expand All @@ -127,26 +110,15 @@ class HadoopRDD[K, V](
private val createTime = new Date()

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456).
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
}
protected def createJobConf(): JobConf = {
val conf: Configuration = serializableConf.value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed to return a new copy of the conf for every partition or something? Because otherwise I'm not sure I see why we can safely remove the lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because RDD objects are not reused at all. Each task gets its own deserialized copy of the HadoopRDD and the conf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth a comment here then saying that the createJobConf() method really does create a new job conf because [xyz] even though it looks like it's just accessing the broadcast value.

conf match {
case jobConf: JobConf =>
jobConf
case _: Configuration =>
val jobConf = new JobConf(conf)
initLocalJobConfFuncOpt.foreach(f => f(jobConf))
jobConf
}
}

Expand All @@ -166,7 +138,7 @@ class HadoopRDD[K, V](
}

override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
val jobConf = createJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
Expand All @@ -187,7 +159,7 @@ class HadoopRDD[K, V](
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val jobConf = getJobConf()
val jobConf = createJobConf()
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
Expand Down Expand Up @@ -241,13 +213,9 @@ class HadoopRDD[K, V](
override def checkpoint() {
// Do nothing. Hadoop RDD should not be checkpointed.
}

def getConf: Configuration = getJobConf()
}

private[spark] object HadoopRDD {
/** Constructing Configuration objects is not threadsafe, use this lock to serialize. */
val CONFIGURATION_INSTANTIATION_LOCK = new Object()

/**
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
Expand Down
8 changes: 2 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ class NewHadoopRDD[K, V](
with SparkHadoopMapReduceUtil
with Logging {

// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
// private val serializableConf = new SerializableWritable(conf)
private val serializableConf = new SerializableWritable(conf)

private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
Expand Down Expand Up @@ -100,7 +98,7 @@ class NewHadoopRDD[K, V](
val iter = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
val conf = serializableConf.value
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
Expand Down Expand Up @@ -161,8 +159,6 @@ class NewHadoopRDD[K, V](
val theSplit = split.asInstanceOf[NewHadoopPartition]
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
}

def getConf: Configuration = confBroadcast.value.value
}

private[spark] class WholeTextFileRDD(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import java.util.TimeZone

import org.apache.spark.sql.SQLConf
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.hive.test.TestHive
Expand All @@ -38,6 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {

override def beforeAll() {
TestHive.cacheTables = true
TestHive.set(SQLConf.SHUFFLE_PARTITIONS, "2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: we need to remove this setting before merging it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep it at 2 to speed up tests ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually speeds up the tests quite a bit, although it might be masking some of the expensive serialization/deserialization issues.

// Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*)
originalTimeZone = TimeZone.getDefault
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hive

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
Expand All @@ -30,11 +29,9 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}

import org.apache.spark.SerializableWritable
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}

import org.apache.spark.sql.catalyst.expressions.{Attribute, Row, GenericMutableRow, Literal, Cast}
import org.apache.spark.sql.catalyst.types.DataType

/**
* A trait for subclasses that handle table scans.
Expand Down Expand Up @@ -64,13 +61,6 @@ class HadoopTableReader(

// TODO: set aws s3 credentials.

private val _broadcastedHiveConf =
sc.sparkContext.broadcast(new SerializableWritable(sc.hiveconf))

def broadcastedHiveConf = _broadcastedHiveConf

def hiveConf = _broadcastedHiveConf.value.value

override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] =
makeRDDForTable(
hiveTable,
Expand All @@ -97,7 +87,7 @@ class HadoopTableReader(
// Create local references to member variables, so that the entire `this` object won't be
// serialized in the closure below.
val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHiveConf
val hiveconfWrapper = new SerializableWritable(sc.hiveconf)

val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
Expand All @@ -110,9 +100,8 @@ class HadoopTableReader(
val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new GenericMutableRow(attrsWithIndex.length)
val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
deserializer.initialize(hiveconfWrapper.value, tableDesc.getProperties)

HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
}
Expand Down Expand Up @@ -161,8 +150,8 @@ class HadoopTableReader(
}

// Create local references so that the outer object isn't serialized.
val hiveconfWrapper = new SerializableWritable(sc.hiveconf)
val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHiveConf
val localDeserializer = partDeserializer
val mutableRow = new GenericMutableRow(attributes.length)

Expand All @@ -185,9 +174,8 @@ class HadoopTableReader(

val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
hivePartitionRDD.mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)
deserializer.initialize(hiveconfWrapper.value, partProps)

// fill the non partition key attributes
HadoopTableReader.fillObject(iter, deserializer, attrs, mutableRow)
Expand Down Expand Up @@ -229,12 +217,12 @@ class HadoopTableReader(

val rdd = new HadoopRDD(
sc.sparkContext,
_broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
Some(initializeJobConfFunc),
sc.hiveconf,
inputFormatClass,
classOf[Writable],
classOf[Writable],
_minSplitsPerRDD)
_minSplitsPerRDD,
Some(initializeJobConfFunc))

// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)
Expand Down