Skip to content

Commit 665653d

Browse files
zsxwingrxin
authored andcommitted
[SPARK-2075][Core] Make the compiler generate same bytes code for Hadoop 1.+ and Hadoop 2.+
`NullWritable` is a `Comparable` rather than `Comparable[NullWritable]` in Hadoop 1.+, so the compiler cannot find an implicit Ordering for it. It will generate different anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. Therefore, here we provide an Ordering for NullWritable so that the compiler will generate same codes. I used the following commands to confirm the generated byte codes are some. ``` mvn -Dhadoop.version=1.2.1 -DskipTests clean package -pl core -am javap -private -c -classpath core/target/scala-2.10/classes org.apache.spark.rdd.RDD > ~/hadoop1.txt mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package -pl core -am javap -private -c -classpath core/target/scala-2.10/classes org.apache.spark.rdd.RDD > ~/hadoop2.txt diff ~/hadoop1.txt ~/hadoop2.txt ``` However, the compiler will generate different codes for the classes which call methods of `JobContext/TaskAttemptContext`. `JobContext/TaskAttemptContext` is a class in Hadoop 1.+, and calling its method will use `invokevirtual`, while it's an interface in Hadoop 2.+, and will use `invokeinterface`. To fix it, we can use reflection to call `JobContext/TaskAttemptContext.getConfiguration`. Author: zsxwing <[email protected]> Closes #3740 from zsxwing/SPARK-2075 and squashes the following commits: 39d9df2 [zsxwing] Fix the code style e4ad8b5 [zsxwing] Use null for the implicit Ordering 734bac9 [zsxwing] Explicitly set the implicit parameters ca03559 [zsxwing] Use reflection to access JobContext/TaskAttemptContext.getConfiguration fa40db0 [zsxwing] Add an Ordering for NullWritable to make the compiler generate same byte codes for RDD (cherry picked from commit 6ee6aa7) Signed-off-by: Reynold Xin <[email protected]>
1 parent 4346a2b commit 665653d

File tree

6 files changed

+41
-6
lines changed

6 files changed

+41
-6
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
2424
import org.apache.hadoop.fs.{FileSystem, Path}
2525
import org.apache.hadoop.fs.FileSystem.Statistics
2626
import org.apache.hadoop.mapred.JobConf
27+
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
2728
import org.apache.hadoop.security.Credentials
2829
import org.apache.hadoop.security.UserGroupInformation
2930

@@ -183,6 +184,17 @@ class SparkHadoopUtil extends Logging {
183184
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
184185
statisticsDataClass.getDeclaredMethod(methodName)
185186
}
187+
188+
/**
189+
* Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
190+
* call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
191+
* for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
192+
* while it's interface in Hadoop 2.+.
193+
*/
194+
def getConfigurationFromJobContext(context: JobContext): Configuration = {
195+
val method = context.getClass.getMethod("getConfiguration")
196+
method.invoke(context).asInstanceOf[Configuration]
197+
}
186198
}
187199

188200
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
2121
import org.apache.hadoop.io.{BytesWritable, LongWritable}
2222
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
2323
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
24+
import org.apache.spark.deploy.SparkHadoopUtil
2425

2526
/**
2627
* Custom Input Format for reading and splitting flat binary files that contain records,
@@ -33,7 +34,7 @@ private[spark] object FixedLengthBinaryInputFormat {
3334

3435
/** Retrieves the record length property from a Hadoop configuration */
3536
def getRecordLength(context: JobContext): Int = {
36-
context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
37+
SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt
3738
}
3839
}
3940

core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
2424
import org.apache.hadoop.io.{BytesWritable, LongWritable}
2525
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
2626
import org.apache.hadoop.mapreduce.lib.input.FileSplit
27+
import org.apache.spark.deploy.SparkHadoopUtil
2728

2829
/**
2930
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
@@ -82,7 +83,7 @@ private[spark] class FixedLengthBinaryRecordReader
8283
// the actual file we will be reading from
8384
val file = fileSplit.getPath
8485
// job configuration
85-
val job = context.getConfiguration
86+
val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
8687
// check compression
8788
val codec = new CompressionCodecFactory(job).getCodec(file)
8889
if (codec != null) {

core/src/main/scala/org/apache/spark/input/PortableDataStream.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAt
2828
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}
2929

3030
import org.apache.spark.annotation.Experimental
31+
import org.apache.spark.deploy.SparkHadoopUtil
3132

3233
/**
3334
* A general format for reading whole files in as streams, byte arrays,
@@ -145,7 +146,8 @@ class PortableDataStream(
145146

146147
private val confBytes = {
147148
val baos = new ByteArrayOutputStream()
148-
context.getConfiguration.write(new DataOutputStream(baos))
149+
SparkHadoopUtil.get.getConfigurationFromJobContext(context).
150+
write(new DataOutputStream(baos))
149151
baos.toByteArray
150152
}
151153

core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit
2626
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
2727
import org.apache.hadoop.mapreduce.RecordReader
2828
import org.apache.hadoop.mapreduce.TaskAttemptContext
29+
import org.apache.spark.deploy.SparkHadoopUtil
2930

3031
/**
3132
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
@@ -45,7 +46,8 @@ private[spark] class WholeTextFileRecordReader(
4546
def getConf: Configuration = conf
4647

4748
private[this] val path = split.getPath(index)
48-
private[this] val fs = path.getFileSystem(context.getConfiguration)
49+
private[this] val fs = path.getFileSystem(
50+
SparkHadoopUtil.get.getConfigurationFromJobContext(context))
4951

5052
// True means the current file has been processed, then skip it.
5153
private[this] var processed = false

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,15 +1160,32 @@ abstract class RDD[T: ClassTag](
11601160
* Save this RDD as a text file, using string representations of elements.
11611161
*/
11621162
def saveAsTextFile(path: String) {
1163-
this.map(x => (NullWritable.get(), new Text(x.toString)))
1163+
// https://issues.apache.org/jira/browse/SPARK-2075
1164+
//
1165+
// NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
1166+
// Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
1167+
// in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
1168+
// Ordering for `NullWritable`. That's why the compiler will generate different anonymous
1169+
// classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
1170+
//
1171+
// Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
1172+
// same bytecodes for `saveAsTextFile`.
1173+
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
1174+
val textClassTag = implicitly[ClassTag[Text]]
1175+
val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
1176+
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
11641177
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
11651178
}
11661179

11671180
/**
11681181
* Save this RDD as a compressed text file, using string representations of elements.
11691182
*/
11701183
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
1171-
this.map(x => (NullWritable.get(), new Text(x.toString)))
1184+
// https://issues.apache.org/jira/browse/SPARK-2075
1185+
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
1186+
val textClassTag = implicitly[ClassTag[Text]]
1187+
val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
1188+
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
11721189
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
11731190
}
11741191

0 commit comments

Comments
 (0)