Skip to content

Commit b4cd73e

Browse files
author
Davies Liu
committed
remove unnecessary broadcast for conf
1 parent 222fa47 commit b4cd73e

File tree

8 files changed

+26
-39
lines changed

8 files changed

+26
-39
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -566,12 +566,10 @@ class SparkContext(config: SparkConf) extends Logging {
566566
valueClass: Class[V],
567567
minPartitions: Int = defaultMinPartitions
568568
): RDD[(K, V)] = {
569-
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
570-
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
571569
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
572570
new HadoopRDD(
573571
this,
574-
confBroadcast,
572+
new SerializableWritable(hadoopConfiguration),
575573
Some(setInputPathsFunc),
576574
inputFormatClass,
577575
keyClass,

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private[python] object Converter extends Logging {
6161
* Other objects are passed through without conversion.
6262
*/
6363
private[python] class WritableToJavaConverter(
64-
conf: Broadcast[SerializableWritable[Configuration]],
64+
conf: SerializableWritable[Configuration],
6565
batchSize: Int) extends Converter[Any, Any] {
6666

6767
/**
@@ -95,7 +95,7 @@ private[python] class WritableToJavaConverter(
9595
}
9696
map
9797
case w: Writable =>
98-
if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
98+
if (batchSize > 1) WritableUtils.clone(w, conf.value) else w
9999
case other => other
100100
}
101101
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -432,9 +432,8 @@ private[spark] object PythonRDD extends Logging {
432432
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
433433
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
434434
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
435-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
436435
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
437-
new WritableToJavaConverter(confBroadcasted, batchSize))
436+
new WritableToJavaConverter(new SerializableWritable(sc.hadoopConfiguration()), batchSize))
438437
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
439438
}
440439

@@ -458,9 +457,8 @@ private[spark] object PythonRDD extends Logging {
458457
val rdd =
459458
newAPIHadoopRDDFromClassNames[K, V, F](sc,
460459
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
461-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
462460
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
463-
new WritableToJavaConverter(confBroadcasted, batchSize))
461+
new WritableToJavaConverter(new SerializableWritable(mergedConf), batchSize))
464462
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
465463
}
466464

@@ -484,9 +482,8 @@ private[spark] object PythonRDD extends Logging {
484482
val rdd =
485483
newAPIHadoopRDDFromClassNames[K, V, F](sc,
486484
None, inputFormatClass, keyClass, valueClass, conf)
487-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
488485
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
489-
new WritableToJavaConverter(confBroadcasted, batchSize))
486+
new WritableToJavaConverter(new SerializableWritable(conf), batchSize))
490487
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
491488
}
492489

@@ -527,9 +524,8 @@ private[spark] object PythonRDD extends Logging {
527524
val rdd =
528525
hadoopRDDFromClassNames[K, V, F](sc,
529526
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
530-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
531527
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
532-
new WritableToJavaConverter(confBroadcasted, batchSize))
528+
new WritableToJavaConverter(new SerializableWritable(mergedConf), batchSize))
533529
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
534530
}
535531

@@ -553,9 +549,8 @@ private[spark] object PythonRDD extends Logging {
553549
val rdd =
554550
hadoopRDDFromClassNames[K, V, F](sc,
555551
None, inputFormatClass, keyClass, valueClass, conf)
556-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
557552
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
558-
new WritableToJavaConverter(confBroadcasted, batchSize))
553+
new WritableToJavaConverter(new SerializableWritable(conf), batchSize))
559554
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
560555
}
561556

core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ private[spark]
3737
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
3838
extends RDD[T](sc, Nil) {
3939

40-
val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
41-
4240
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
4341

4442
override def getPartitions: Array[Partition] = {
@@ -71,7 +69,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
7169

7270
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
7371
val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
74-
CheckpointRDD.readFromFile(file, broadcastedConf, context)
72+
CheckpointRDD.readFromFile(file, new SerializableWritable(sc.hadoopConfiguration), context)
7573
}
7674

7775
override def checkpoint() {
@@ -86,12 +84,12 @@ private[spark] object CheckpointRDD extends Logging {
8684

8785
def writeToFile[T: ClassTag](
8886
path: String,
89-
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
87+
conf: SerializableWritable[Configuration],
9088
blockSize: Int = -1
9189
)(ctx: TaskContext, iterator: Iterator[T]) {
9290
val env = SparkEnv.get
9391
val outputDir = new Path(path)
94-
val fs = outputDir.getFileSystem(broadcastedConf.value.value)
92+
val fs = outputDir.getFileSystem(conf.value)
9593

9694
val finalOutputName = splitIdToFile(ctx.partitionId)
9795
val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -130,11 +128,11 @@ private[spark] object CheckpointRDD extends Logging {
130128

131129
def readFromFile[T](
132130
path: Path,
133-
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
131+
conf: SerializableWritable[Configuration],
134132
context: TaskContext
135133
): Iterator[T] = {
136134
val env = SparkEnv.get
137-
val fs = path.getFileSystem(broadcastedConf.value.value)
135+
val fs = path.getFileSystem(conf.value)
138136
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
139137
val fileInputStream = fs.open(path, bufferSize)
140138
val serializer = env.serializer.newInstance()
@@ -159,8 +157,8 @@ private[spark] object CheckpointRDD extends Logging {
159157
val path = new Path(hdfsPath, "temp")
160158
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
161159
val fs = path.getFileSystem(conf)
162-
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
163-
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
160+
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](
161+
path.toString, new SerializableWritable(conf), 1024) _)
164162
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
165163
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
166164
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
9999
@DeveloperApi
100100
class HadoopRDD[K, V](
101101
sc: SparkContext,
102-
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
102+
conf: SerializableWritable[Configuration],
103103
initLocalJobConfFuncOpt: Option[JobConf => Unit],
104104
inputFormatClass: Class[_ <: InputFormat[K, V]],
105105
keyClass: Class[K],
@@ -116,8 +116,7 @@ class HadoopRDD[K, V](
116116
minPartitions: Int) = {
117117
this(
118118
sc,
119-
sc.broadcast(new SerializableWritable(conf))
120-
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
119+
new SerializableWritable(conf),
121120
None /* initLocalJobConfFuncOpt */,
122121
inputFormatClass,
123122
keyClass,
@@ -136,7 +135,7 @@ class HadoopRDD[K, V](
136135

137136
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
138137
protected def getJobConf(): JobConf = {
139-
val conf: Configuration = broadcastedConf.value.value
138+
val conf: Configuration = this.conf.value
140139
if (shouldCloneJobConf) {
141140
// Hadoop Configuration objects are not thread-safe, which may lead to various problems if
142141
// one job modifies a configuration while another reads it (SPARK-2546). This problem occurs

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ class NewHadoopRDD[K, V](
7373
with SparkHadoopMapReduceUtil
7474
with Logging {
7575

76-
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
77-
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
76+
private val sConf = new SerializableWritable(conf)
7877
// private val serializableConf = new SerializableWritable(conf)
7978

8079
private val jobTrackerId: String = {
@@ -104,7 +103,7 @@ class NewHadoopRDD[K, V](
104103
val iter = new Iterator[(K, V)] {
105104
val split = theSplit.asInstanceOf[NewHadoopPartition]
106105
logInfo("Input split: " + split.serializableHadoopSplit)
107-
val conf = confBroadcast.value.value
106+
val conf = sConf.value
108107
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
109108
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
110109
val format = inputFormatClass.newInstance
@@ -190,7 +189,7 @@ class NewHadoopRDD[K, V](
190189
locs.getOrElse(split.getLocations.filter(_ != "localhost"))
191190
}
192191

193-
def getConf: Configuration = confBroadcast.value.value
192+
def getConf: Configuration = sConf.value
194193
}
195194

196195
private[spark] object NewHadoopRDD {

core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
9090
}
9191

9292
// Save to file, and reload it as an RDD
93-
val broadcastedConf = rdd.context.broadcast(
94-
new SerializableWritable(rdd.context.hadoopConfiguration))
95-
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
93+
val conf = new SerializableWritable(rdd.context.hadoopConfiguration)
94+
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, conf) _)
9695
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
9796
if (newRDD.partitions.size != rdd.partitions.size) {
9897
throw new SparkException(

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ class HadoopTableReader(
6464

6565
// TODO: set aws s3 credentials.
6666

67-
private val _broadcastedHiveConf =
68-
sc.sparkContext.broadcast(new SerializableWritable(hiveExtraConf))
67+
private val conf = new SerializableWritable(hiveExtraConf)
6968

7069
override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] =
7170
makeRDDForTable(
@@ -93,7 +92,7 @@ class HadoopTableReader(
9392
// Create local references to member variables, so that the entire `this` object won't be
9493
// serialized in the closure below.
9594
val tableDesc = relation.tableDesc
96-
val broadcastedHiveConf = _broadcastedHiveConf
95+
val _conf = conf
9796

9897
val tablePath = hiveTable.getPath
9998
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
@@ -107,7 +106,7 @@ class HadoopTableReader(
107106
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
108107

109108
val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
110-
val hconf = broadcastedHiveConf.value.value
109+
val hconf = _conf.value
111110
val deserializer = deserializerClass.newInstance()
112111
deserializer.initialize(hconf, tableDesc.getProperties)
113112
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)

0 commit comments

Comments
 (0)