Skip to content

Commit 31a2fff

Browse files
committed
Scalastyle fixes
1 parent fc5099e commit 31a2fff

File tree

4 files changed

+80
-61
lines changed

4 files changed

+80
-61
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ private[python] object PythonHadoopUtil {
3131
conf
3232
}
3333

34-
/** Merges two configurations, returns a copy of left with keys from right overwriting any matching keys in left */
34+
/**
35+
* Merges two configurations, returns a copy of left with keys from right overwriting
36+
* any matching keys in left
37+
*/
3538
def mergeConfs(left: Configuration, right: Configuration) = {
3639
import collection.JavaConversions._
3740
val copy = new Configuration(left)

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 54 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,6 @@ private[spark] object PythonRDD {
272272
}
273273
}
274274

275-
// PySpark / Hadoop InputFormat//
276-
277275
/** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */
278276
def sequenceFile[K, V](sc: JavaSparkContext,
279277
path: String,
@@ -295,14 +293,15 @@ private[spark] object PythonRDD {
295293
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
296294
* key and value class
297295
*/
298-
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
299-
path: String,
300-
inputFormatClazz: String,
301-
keyClazz: String,
302-
valueClazz: String,
303-
keyWrapper: String,
304-
valueWrapper: String,
305-
confAsMap: java.util.HashMap[String, String]) = {
296+
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
297+
sc: JavaSparkContext,
298+
path: String,
299+
inputFormatClazz: String,
300+
keyClazz: String,
301+
valueClazz: String,
302+
keyWrapper: String,
303+
valueWrapper: String,
304+
confAsMap: java.util.HashMap[String, String]) = {
306305
val conf = PythonHadoopUtil.mapToConf(confAsMap)
307306
val baseConf = sc.hadoopConfiguration()
308307
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
@@ -314,16 +313,18 @@ private[spark] object PythonRDD {
314313
}
315314

316315
/**
317-
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python,
318-
* using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]], key and value class
316+
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is
317+
* passed in from Python, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
318+
* key and value class
319319
*/
320-
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
321-
inputFormatClazz: String,
322-
keyClazz: String,
323-
valueClazz: String,
324-
keyWrapper: String,
325-
valueWrapper: String,
326-
confAsMap: java.util.HashMap[String, String]) = {
320+
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
321+
sc: JavaSparkContext,
322+
inputFormatClazz: String,
323+
keyClazz: String,
324+
valueClazz: String,
325+
keyWrapper: String,
326+
valueWrapper: String,
327+
confAsMap: java.util.HashMap[String, String]) = {
327328
val conf = PythonHadoopUtil.mapToConf(confAsMap)
328329
val rdd =
329330
newAPIHadoopRDDFromClassNames[K, V, F](sc,
@@ -332,12 +333,13 @@ private[spark] object PythonRDD {
332333
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
333334
}
334335

335-
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
336-
path: Option[String] = None,
337-
inputFormatClazz: String,
338-
keyClazz: String,
339-
valueClazz: String,
340-
conf: Configuration) = {
336+
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
337+
sc: JavaSparkContext,
338+
path: Option[String] = None,
339+
inputFormatClazz: String,
340+
keyClazz: String,
341+
valueClazz: String,
342+
conf: Configuration) = {
341343
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
342344
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
343345
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
@@ -356,14 +358,15 @@ private[spark] object PythonRDD {
356358
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
357359
* key and value class
358360
*/
359-
def hadoopFile[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
360-
path: String,
361-
inputFormatClazz: String,
362-
keyClazz: String,
363-
valueClazz: String,
364-
keyWrapper: String,
365-
valueWrapper: String,
366-
confAsMap: java.util.HashMap[String, String]) = {
361+
def hadoopFile[K, V, F <: InputFormat[K, V]](
362+
sc: JavaSparkContext,
363+
path: String,
364+
inputFormatClazz: String,
365+
keyClazz: String,
366+
valueClazz: String,
367+
keyWrapper: String,
368+
valueWrapper: String,
369+
confAsMap: java.util.HashMap[String, String]) = {
367370
val conf = PythonHadoopUtil.mapToConf(confAsMap)
368371
val baseConf = sc.hadoopConfiguration()
369372
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
@@ -375,16 +378,18 @@ private[spark] object PythonRDD {
375378
}
376379

377380
/**
378-
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python,
379-
* using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], key and value class
381+
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map
382+
* that is passed in from Python, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
383+
* key and value class
380384
*/
381-
def hadoopRDD[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
382-
inputFormatClazz: String,
383-
keyClazz: String,
384-
valueClazz: String,
385-
keyWrapper: String,
386-
valueWrapper: String,
387-
confAsMap: java.util.HashMap[String, String]) = {
385+
def hadoopRDD[K, V, F <: InputFormat[K, V]](
386+
sc: JavaSparkContext,
387+
inputFormatClazz: String,
388+
keyClazz: String,
389+
valueClazz: String,
390+
keyWrapper: String,
391+
valueWrapper: String,
392+
confAsMap: java.util.HashMap[String, String]) = {
388393
val conf = PythonHadoopUtil.mapToConf(confAsMap)
389394
val rdd =
390395
hadoopRDDFromClassNames[K, V, F](sc,
@@ -393,12 +398,13 @@ private[spark] object PythonRDD {
393398
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
394399
}
395400

396-
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
397-
path: Option[String] = None,
398-
inputFormatClazz: String,
399-
keyClazz: String,
400-
valueClazz: String,
401-
conf: Configuration) = {
401+
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
402+
sc: JavaSparkContext,
403+
path: Option[String] = None,
404+
inputFormatClazz: String,
405+
keyClazz: String,
406+
valueClazz: String,
407+
conf: Configuration) = {
402408
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
403409
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
404410
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]

core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,15 @@ import scala.util.Failure
2727

2828
/**
2929
* Utilities for serialization / deserialization between Python and Java, using MsgPack.
30-
* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] -> Scala objects and primitives
30+
* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] ->
31+
* Scala objects and primitives
3132
*/
3233
private[python] object SerDeUtil extends Logging {
3334

3435
/**
35-
* Checks whether a Scala object needs to be registered with MsgPack. String, primitives and the standard collections
36-
* don't need to be registered as MsgPack takes care of serializing them and registering them throws scary looking
37-
* errors (but still works).
36+
* Checks whether a Scala object needs to be registered with MsgPack. String, primitives
37+
* and the standard collections don't need to be registered as MsgPack takes care of serializing
38+
* them and registering them throws scary looking errors (but still works).
3839
*/
3940
def needsToBeRegistered[T](t: T) = {
4041
t match {
@@ -101,8 +102,8 @@ private[python] object SerDeUtil extends Logging {
101102
}
102103

103104
/**
104-
* Converts an RDD of key-value pairs, where key and/or value could be instances of [[org.apache.hadoop.io.Writable]],
105-
* into an RDD[(K, V)]
105+
* Converts an RDD of key-value pairs, where key and/or value could be instances of
106+
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
106107
*/
107108
def convertRDD[K, V](rdd: RDD[(K, V)]) = {
108109
rdd.map{
@@ -113,7 +114,9 @@ private[python] object SerDeUtil extends Logging {
113114
}
114115
}
115116

116-
/** Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or object representation */
117+
/** Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
118+
* object representation
119+
*/
117120
def convert(writable: Writable): Any = {
118121
import collection.JavaConversions._
119122
writable match {
@@ -132,3 +135,4 @@ private[python] object SerDeUtil extends Logging {
132135
}
133136

134137
}
138+

core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.SparkContext
2121
import org.apache.hadoop.io._
2222
import scala.Array
2323
import java.io.{DataOutput, DataInput}
24+
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
2425

2526
/**
2627
* A class to test MsgPack serialization on the Scala side, that will be deserialized
@@ -66,15 +67,20 @@ object WriteInputFormatTestDataGenerator extends App {
6667
val boolPath = s"$basePath/sfbool"
6768
val nullPath = s"$basePath/sfnull"
6869

69-
// Create test data for IntWritable, DoubleWritable, Text, BytesWritable, BooleanWritable and NullWritable
70+
/*
71+
* Create test data for IntWritable, DoubleWritable, Text, BytesWritable,
72+
* BooleanWritable and NullWritable
73+
*/
7074
val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa"))
7175
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
7276
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
7377
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
7478
sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
7579
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
7680
sc.parallelize(bools).saveAsSequenceFile(boolPath)
77-
sc.parallelize(intKeys).map{ case (k, v) => (new IntWritable(k), NullWritable.get()) }.saveAsSequenceFile(nullPath)
81+
sc.parallelize(intKeys).map{ case (k, v) =>
82+
(new IntWritable(k), NullWritable.get())
83+
}.saveAsSequenceFile(nullPath)
7884

7985
// Create test data for ArrayWritable
8086
val data = Seq(
@@ -86,7 +92,7 @@ object WriteInputFormatTestDataGenerator extends App {
8692
.map{ case (k, v) =>
8793
(new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
8894
}
89-
.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
95+
.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
9096

9197
// Create test data for MapWritable, with keys DoubleWritable and values Text
9298
val mapData = Seq(
@@ -116,6 +122,6 @@ object WriteInputFormatTestDataGenerator extends App {
116122
val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
117123
rdd.saveAsNewAPIHadoopFile(classPath,
118124
classOf[Text], classOf[TestWritable],
119-
classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestWritable]])
125+
classOf[SequenceFileOutputFormat[Text, TestWritable]])
120126

121-
}
127+
}

0 commit comments

Comments
 (0)