Skip to content

Commit c52209f

Browse files
yaooqinnyhuang-db
authored andcommitted
[SPARK-52052][CORE] Add .broadcast in the companion object of SerializableConfiguration
### What changes were proposed in this pull request? Add .broadcast in the companion object of SerializableConfiguration ### Why are the changes needed? - Simplify broadcast logic for SerializableConfiguration - Reduce SparkContext API usage across non-core modules - Bypass IDE type inference issue ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Pass GA ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#50844 from yaooqinn/SPARK-52052. Authored-by: Kent Yao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 8e59402 commit c52209f

File tree

30 files changed

+65
-54
lines changed

30 files changed

+65
-54
lines changed

connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ case class AvroScan(
4848
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
4949
// Hadoop Configurations are case sensitive.
5050
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
51-
val broadcastedConf = sparkSession.sparkContext.broadcast(
52-
new SerializableConfiguration(hadoopConf))
51+
val broadcastedConf = SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf)
5352
val parsedOptions = new AvroOptions(caseSensitiveMap, hadoopConf)
5453
// The partition values are already truncated in `FileScan.partitions`.
5554
// We should use `readPartitionSchema` as the partition schema here.

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ private[spark] object PythonRDD extends Logging {
366366
val kc = Utils.classForName[K](keyClass)
367367
val vc = Utils.classForName[V](valueClass)
368368
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
369-
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
369+
val confBroadcasted = SerializableConfiguration.broadcast(sc.sc, sc.hadoopConfiguration())
370370
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
371371
new WritableToJavaConverter(confBroadcasted))
372372
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -392,7 +392,7 @@ private[spark] object PythonRDD extends Logging {
392392
val rdd =
393393
newAPIHadoopRDDFromClassNames[K, V, F](sc,
394394
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
395-
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
395+
val confBroadcasted = SerializableConfiguration.broadcast(sc.sc, mergedConf)
396396
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
397397
new WritableToJavaConverter(confBroadcasted))
398398
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -418,7 +418,7 @@ private[spark] object PythonRDD extends Logging {
418418
val rdd =
419419
newAPIHadoopRDDFromClassNames[K, V, F](sc,
420420
None, inputFormatClass, keyClass, valueClass, conf)
421-
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
421+
val confBroadcasted = SerializableConfiguration.broadcast(sc.sc, conf)
422422
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
423423
new WritableToJavaConverter(confBroadcasted))
424424
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -461,7 +461,7 @@ private[spark] object PythonRDD extends Logging {
461461
val rdd =
462462
hadoopRDDFromClassNames[K, V, F](sc,
463463
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
464-
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
464+
val confBroadcasted = SerializableConfiguration.broadcast(sc.sc, mergedConf)
465465
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
466466
new WritableToJavaConverter(confBroadcasted))
467467
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -487,7 +487,7 @@ private[spark] object PythonRDD extends Logging {
487487
val rdd =
488488
hadoopRDDFromClassNames[K, V, F](sc,
489489
None, inputFormatClass, keyClass, valueClass, conf)
490-
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
490+
val confBroadcasted = SerializableConfiguration.broadcast(sc.sc, conf)
491491
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
492492
new WritableToJavaConverter(confBroadcasted))
493493
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,7 @@ class HadoopRDD[K, V](
146146
minPartitions: Int) = {
147147
this(
148148
sc,
149-
sc.broadcast(new SerializableConfiguration(conf))
150-
.asInstanceOf[Broadcast[SerializableConfiguration]],
149+
SerializableConfiguration.broadcast(sc, conf),
151150
initLocalJobConfFuncOpt = None,
152151
inputFormatClass,
153152
keyClass,

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ class NewHadoopRDD[K, V](
104104

105105

106106
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
107-
private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf))
108-
// private val serializableConf = new SerializableWritable(_conf)
107+
private val confBroadcast = SerializableConfiguration.broadcast(sc, _conf)
109108

110109
private val jobTrackerId: String = {
111110
val dateTimeFormatter =

core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
4747
@transient private val hadoopConf = sc.hadoopConfiguration
4848
@transient private val cpath = new Path(checkpointPath)
4949
@transient private val fs = cpath.getFileSystem(hadoopConf)
50-
private val broadcastedConf = sc.broadcast(new SerializableConfiguration(hadoopConf))
50+
private val broadcastedConf = SerializableConfiguration.broadcast(sc, hadoopConf)
5151

5252
// Fail fast if checkpoint directory does not exist
5353
require(fs.exists(cpath), s"Checkpoint directory does not exist: $checkpointPath")
@@ -161,8 +161,7 @@ private[spark] object ReliableCheckpointRDD extends Logging {
161161
}
162162

163163
// Save to file, and reload it as an RDD
164-
val broadcastedConf = sc.broadcast(
165-
new SerializableConfiguration(sc.hadoopConfiguration))
164+
val broadcastedConf = SerializableConfiguration.broadcast(sc)
166165
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
167166
sc.runJob(originalRDD,
168167
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import java.io.{ObjectInputStream, ObjectOutputStream}
2020

2121
import org.apache.hadoop.conf.Configuration
2222

23+
import org.apache.spark.SparkContext
2324
import org.apache.spark.annotation.{DeveloperApi, Unstable}
25+
import org.apache.spark.broadcast.Broadcast
2426

2527
/**
2628
* Hadoop configuration but serializable. Use `value` to access the Hadoop configuration.
@@ -39,3 +41,13 @@ class SerializableConfiguration(@transient var value: Configuration) extends Ser
3941
value.readFields(in)
4042
}
4143
}
44+
45+
private[spark] object SerializableConfiguration {
46+
def broadcast(sc: SparkContext, conf: Configuration): Broadcast[SerializableConfiguration] = {
47+
sc.broadcast(new SerializableConfiguration(conf))
48+
}
49+
50+
def broadcast(sc: SparkContext): Broadcast[SerializableConfiguration] = {
51+
broadcast(sc, sc.hadoopConfiguration)
52+
}
53+
}

mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private[image] case class ImageFileFormat() extends FileFormat with DataSourceRe
6262
"Image data source only produces a single data column named \"image\".")
6363

6464
val broadcastedHadoopConf =
65-
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
65+
SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf)
6666

6767
val imageSourceOptions = new ImageOptions(options)
6868

mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ private[libsvm] case class LibSVMFileFormat()
153153
val isSparse = libSVMOptions.isSparse
154154

155155
val broadcastedHadoopConf =
156-
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
156+
SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf)
157157

158158
(file: PartitionedFile) => {
159159
val linesReader = Utils.createResourceUninterruptiblyIfInTaskThread(

sql/core/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ private[sql] class AvroFileFormat extends FileFormat
8686
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
8787

8888
val broadcastedConf =
89-
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
89+
SerializableConfiguration.broadcast(spark.sparkContext, hadoopConf)
9090
val parsedOptions = new AvroOptions(options, hadoopConf)
9191
val datetimeRebaseModeInRead = parsedOptions.datetimeRebaseModeInRead
9292

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ case class BinaryFileFormat() extends FileFormat with DataSourceRegister {
9696
""".stripMargin)
9797

9898
val broadcastedHadoopConf =
99-
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
99+
SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf)
100100
val filterFuncs = filters.flatMap(filter => createFilterFunction(filter))
101101
val maxLength = sparkSession.sessionState.conf.getConf(SOURCES_BINARY_FILE_MAX_LENGTH)
102102

0 commit comments

Comments
 (0)