Skip to content

Commit 7134e55

Browse files
committed
create SparkHadoopWriterConfig directly in PairRDDFunctions.
1 parent 6d68b4f commit 7134e55

File tree

2 files changed

+3
-31
lines changed

2 files changed

+3
-31
lines changed

core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterConfig.scala

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -82,27 +82,3 @@ abstract class SparkHadoopWriterConfig[K, V: ClassTag] extends Serializable {
8282
def checkOutputSpecs(jobContext: JobContext): Unit
8383

8484
}
85-
86-
object SparkHadoopWriterConfig {
87-
88-
/**
89-
* Instantiates a SparkHadoopWriterConfig using the given configuration.
90-
*/
91-
def instantiate[K, V](className: String, conf: Configuration)(
92-
implicit ctorArgTag: ClassTag[(K, V)]): SparkHadoopWriterConfig[K, V] = {
93-
val clazz = Utils.classForName(className).asInstanceOf[Class[SparkHadoopWriterConfig[K, V]]]
94-
95-
// First try the one with argument (conf: SerializableConfiguration).
96-
// If that doesn't exist, try the one with (conf: SerializableJobConf).
97-
try {
98-
val ctor = clazz.getDeclaredConstructor(
99-
classOf[SerializableConfiguration], classOf[ClassTag[(K, V)]])
100-
ctor.newInstance(new SerializableConfiguration(conf), ctorArgTag)
101-
} catch {
102-
case _: NoSuchMethodException =>
103-
val ctor = clazz.getDeclaredConstructor(
104-
classOf[SerializableJobConf], classOf[ClassTag[(K, V)]])
105-
ctor.newInstance(new SerializableJobConf(conf.asInstanceOf[JobConf]), ctorArgTag)
106-
}
107-
}
108-
}

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.internal.io._
3939
import org.apache.spark.internal.Logging
4040
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4141
import org.apache.spark.serializer.Serializer
42-
import org.apache.spark.util.Utils
42+
import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils}
4343
import org.apache.spark.util.collection.CompactBuffer
4444
import org.apache.spark.util.random.StratifiedSamplingUtils
4545

@@ -1051,9 +1051,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10511051
* configured for a Hadoop MapReduce job.
10521052
*/
10531053
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
1054-
val config = SparkHadoopWriterConfig.instantiate[K, V](
1055-
className = classOf[SparkHadoopMapReduceWriterConfig[K, V]].getName,
1056-
conf = conf)
1054+
val config = new SparkHadoopMapReduceWriterConfig[K, V](new SerializableConfiguration(conf))
10571055
SparkHadoopWriter.write(
10581056
rdd = self,
10591057
config = config)
@@ -1066,9 +1064,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10661064
* MapReduce job.
10671065
*/
10681066
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
1069-
val config = SparkHadoopWriterConfig.instantiate[K, V](
1070-
className = classOf[SparkHadoopMapRedWriterConfig[K, V]].getName,
1071-
conf = conf)
1067+
val config = new SparkHadoopMapRedWriterConfig[K, V](new SerializableJobConf(conf))
10721068
SparkHadoopWriter.write(
10731069
rdd = self,
10741070
config = config)

0 commit comments

Comments
 (0)