Skip to content

Commit 951c117

Browse files
committed
Merge branch 'master' into pyspark-inputformats
Conflicts: project/SparkBuild.scala
2 parents f6aac55 + 28238c8 commit 951c117

File tree

22 files changed

+216
-136
lines changed

22 files changed

+216
-136
lines changed

README.md

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,33 @@ guide, on the project webpage at <http://spark.apache.org/documentation.html>.
1010
This README file only contains basic setup instructions.
1111

1212

13-
## Building
13+
## Building Spark
1414

15-
Spark requires Scala 2.10. The project is built using Simple Build Tool (SBT),
16-
which can be obtained [here](http://www.scala-sbt.org). If SBT is installed we
17-
will use the system version of sbt otherwise we will attempt to download it
18-
automatically. To build Spark and its example programs, run:
15+
Spark is built on Scala 2.10. To build Spark and its example programs, run:
1916

2017
./sbt/sbt assembly
2118

22-
Once you've built Spark, the easiest way to start using it is the shell:
19+
## Interactive Scala Shell
20+
21+
The easiest way to start using Spark is through the Scala shell:
2322

2423
./bin/spark-shell
2524

26-
Or, for the Python API, the Python shell (`./bin/pyspark`).
25+
Try the following command, which should return 1000:
26+
27+
scala> sc.parallelize(1 to 1000).count()
28+
29+
## Interactive Python Shell
30+
31+
Alternatively, if you prefer Python, you can use the Python shell:
32+
33+
./bin/pyspark
34+
35+
And run the following command, which should also return 1000:
36+
37+
>>> sc.parallelize(range(1000)).count()
38+
39+
## Example Programs
2740

2841
Spark also comes with several sample programs in the `examples` directory.
2942
To run one of them, use `./bin/run-example <class> <params>`. For example:
@@ -38,13 +51,13 @@ All of the Spark samples take a `<master>` parameter that is the cluster URL
3851
to connect to. This can be a mesos:// or spark:// URL, or "local" to run
3952
locally with one thread, or "local[N]" to run locally with N threads.
4053

41-
## Running tests
54+
## Running Tests
4255

43-
Testing first requires [Building](#building) Spark. Once Spark is built, tests
56+
Testing first requires [building Spark](#building-spark). Once Spark is built, tests
4457
can be run using:
4558

46-
`./sbt/sbt test`
47-
59+
./sbt/sbt test
60+
4861
## A Note About Hadoop Versions
4962

5063
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,14 @@ class HashPartitioner(partitions: Int) extends Partitioner {
8989
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
9090
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
9191
*/
92-
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
92+
class RangePartitioner[K : Ordering : ClassTag, V](
9393
partitions: Int,
9494
@transient rdd: RDD[_ <: Product2[K,V]],
9595
private val ascending: Boolean = true)
9696
extends Partitioner {
9797

98+
private val ordering = implicitly[Ordering[K]]
99+
98100
// An array of upper bounds for the first (partitions - 1) partitions
99101
private val rangeBounds: Array[K] = {
100102
if (partitions == 1) {
@@ -103,7 +105,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
103105
val rddSize = rdd.count()
104106
val maxSampleSize = partitions * 20.0
105107
val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
106-
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
108+
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted
107109
if (rddSample.length == 0) {
108110
Array()
109111
} else {
@@ -126,7 +128,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
126128
var partition = 0
127129
if (rangeBounds.length < 1000) {
128130
// If we have less than 100 partitions naive search
129-
while (partition < rangeBounds.length && k > rangeBounds(partition)) {
131+
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
130132
partition += 1
131133
}
132134
} else {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -427,9 +427,9 @@ class SparkContext(config: SparkConf) extends Logging {
427427
* Read a text file from HDFS, a local file system (available on all nodes), or any
428428
* Hadoop-supported file system URI, and return it as an RDD of Strings.
429429
*/
430-
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
430+
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
431431
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
432-
minSplits).map(pair => pair._2.toString)
432+
minPartitions).map(pair => pair._2.toString)
433433
}
434434

435435
/**
@@ -457,9 +457,10 @@ class SparkContext(config: SparkConf) extends Logging {
457457
*
458458
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
459459
*
460-
* @param minSplits A suggestion value of the minimal splitting number for input data.
460+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
461461
*/
462-
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
462+
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
463+
RDD[(String, String)] = {
463464
val job = new NewHadoopJob(hadoopConfiguration)
464465
NewFileInputFormat.addInputPath(job, new Path(path))
465466
val updateConf = job.getConfiguration
@@ -469,7 +470,7 @@ class SparkContext(config: SparkConf) extends Logging {
469470
classOf[String],
470471
classOf[String],
471472
updateConf,
472-
minSplits)
473+
minPartitions)
473474
}
474475

475476
/**
@@ -481,7 +482,7 @@ class SparkContext(config: SparkConf) extends Logging {
481482
* @param inputFormatClass Class of the InputFormat
482483
* @param keyClass Class of the keys
483484
* @param valueClass Class of the values
484-
* @param minSplits Minimum number of Hadoop Splits to generate.
485+
* @param minPartitions Minimum number of Hadoop Splits to generate.
485486
*
486487
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
487488
* record, directly caching the returned RDD will create many references to the same object.
@@ -493,11 +494,11 @@ class SparkContext(config: SparkConf) extends Logging {
493494
inputFormatClass: Class[_ <: InputFormat[K, V]],
494495
keyClass: Class[K],
495496
valueClass: Class[V],
496-
minSplits: Int = defaultMinSplits
497+
minPartitions: Int = defaultMinPartitions
497498
): RDD[(K, V)] = {
498499
// Add necessary security credentials to the JobConf before broadcasting it.
499500
SparkHadoopUtil.get.addCredentials(conf)
500-
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
501+
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
501502
}
502503

503504
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
@@ -512,7 +513,7 @@ class SparkContext(config: SparkConf) extends Logging {
512513
inputFormatClass: Class[_ <: InputFormat[K, V]],
513514
keyClass: Class[K],
514515
valueClass: Class[V],
515-
minSplits: Int = defaultMinSplits
516+
minPartitions: Int = defaultMinPartitions
516517
): RDD[(K, V)] = {
517518
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
518519
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -524,15 +525,15 @@ class SparkContext(config: SparkConf) extends Logging {
524525
inputFormatClass,
525526
keyClass,
526527
valueClass,
527-
minSplits)
528+
minPartitions)
528529
}
529530

530531
/**
531532
* Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
532533
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
533534
* can just write, for example,
534535
* {{{
535-
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
536+
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minPartitions)
536537
* }}}
537538
*
538539
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
@@ -541,13 +542,13 @@ class SparkContext(config: SparkConf) extends Logging {
541542
* a `map` function.
542543
*/
543544
def hadoopFile[K, V, F <: InputFormat[K, V]]
544-
(path: String, minSplits: Int)
545+
(path: String, minPartitions: Int)
545546
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
546547
hadoopFile(path,
547548
fm.runtimeClass.asInstanceOf[Class[F]],
548549
km.runtimeClass.asInstanceOf[Class[K]],
549550
vm.runtimeClass.asInstanceOf[Class[V]],
550-
minSplits)
551+
minPartitions)
551552
}
552553

553554
/**
@@ -565,7 +566,7 @@ class SparkContext(config: SparkConf) extends Logging {
565566
*/
566567
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
567568
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
568-
hadoopFile[K, V, F](path, defaultMinSplits)
569+
hadoopFile[K, V, F](path, defaultMinPartitions)
569570

570571
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
571572
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
@@ -626,10 +627,10 @@ class SparkContext(config: SparkConf) extends Logging {
626627
def sequenceFile[K, V](path: String,
627628
keyClass: Class[K],
628629
valueClass: Class[V],
629-
minSplits: Int
630+
minPartitions: Int
630631
): RDD[(K, V)] = {
631632
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
632-
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
633+
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
633634
}
634635

635636
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
@@ -641,7 +642,7 @@ class SparkContext(config: SparkConf) extends Logging {
641642
* */
642643
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
643644
): RDD[(K, V)] =
644-
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
645+
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
645646

646647
/**
647648
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -665,7 +666,7 @@ class SparkContext(config: SparkConf) extends Logging {
665666
* a `map` function.
666667
*/
667668
def sequenceFile[K, V]
668-
(path: String, minSplits: Int = defaultMinSplits)
669+
(path: String, minPartitions: Int = defaultMinPartitions)
669670
(implicit km: ClassTag[K], vm: ClassTag[V],
670671
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
671672
: RDD[(K, V)] = {
@@ -674,7 +675,7 @@ class SparkContext(config: SparkConf) extends Logging {
674675
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
675676
val writables = hadoopFile(path, format,
676677
kc.writableClass(km).asInstanceOf[Class[Writable]],
677-
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
678+
vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
678679
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
679680
}
680681

@@ -688,9 +689,9 @@ class SparkContext(config: SparkConf) extends Logging {
688689
*/
689690
def objectFile[T: ClassTag](
690691
path: String,
691-
minSplits: Int = defaultMinSplits
692+
minPartitions: Int = defaultMinPartitions
692693
): RDD[T] = {
693-
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
694+
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
694695
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
695696
}
696697

@@ -1183,8 +1184,12 @@ class SparkContext(config: SparkConf) extends Logging {
11831184
def defaultParallelism: Int = taskScheduler.defaultParallelism
11841185

11851186
/** Default min number of partitions for Hadoop RDDs when not given by user */
1187+
@deprecated("use defaultMinPartitions", "1.0.0")
11861188
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
11871189

1190+
/** Default min number of partitions for Hadoop RDDs when not given by user */
1191+
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
1192+
11881193
private val nextShuffleId = new AtomicInteger(0)
11891194

11901195
private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
@@ -1268,7 +1273,7 @@ object SparkContext extends Logging {
12681273
rdd: RDD[(K, V)]) =
12691274
new SequenceFileRDDFunctions(rdd)
12701275

1271-
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
1276+
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
12721277
rdd: RDD[(K, V)]) =
12731278
new OrderedRDDFunctions[K, V, (K, V)](rdd)
12741279

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -626,10 +626,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
626626
* order of the keys).
627627
*/
628628
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
629-
class KeyOrdering(val a: K) extends Ordered[K] {
630-
override def compare(b: K) = comp.compare(a, b)
631-
}
632-
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
629+
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
633630
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
634631
}
635632

@@ -640,10 +637,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
640637
* order of the keys).
641638
*/
642639
def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = {
643-
class KeyOrdering(val a: K) extends Ordered[K] {
644-
override def compare(b: K) = comp.compare(a, b)
645-
}
646-
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
640+
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
647641
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions))
648642
}
649643

0 commit comments

Comments
 (0)