Skip to content

Commit 8b967ae

Browse files
committed
Merge branch 'master' into SPARK-1930
2 parents 655a820 + 219dc00 commit 8b967ae

File tree

24 files changed

+118
-70
lines changed

24 files changed

+118
-70
lines changed

bin/run-example

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
5151
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
5252
fi
5353

54-
./bin/spark-submit \
54+
"$FWDIR"/bin/spark-submit \
5555
--master $EXAMPLE_MASTER \
5656
--class $EXAMPLE_CLASS \
5757
"$SPARK_EXAMPLES_JAR" \

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@
235235
</dependency>
236236
<dependency>
237237
<groupId>org.easymock</groupId>
238-
<artifactId>easymock</artifactId>
238+
<artifactId>easymockclassextension</artifactId>
239239
<scope>test</scope>
240240
</dependency>
241241
<dependency>

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,17 @@ class HashPartitioner(partitions: Int) extends Partitioner {
8383
case _ =>
8484
false
8585
}
86+
87+
override def hashCode: Int = numPartitions
8688
}
8789

8890
/**
8991
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
9092
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
93+
*
94+
* Note that the actual number of partitions created by the RangePartitioner might not be the same
95+
* as the `partitions` parameter, in the case where the number of sampled records is less than
96+
* the value of `partitions`.
9197
*/
9298
class RangePartitioner[K : Ordering : ClassTag, V](
9399
partitions: Int,
@@ -119,7 +125,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
119125
}
120126
}
121127

122-
def numPartitions = partitions
128+
def numPartitions = rangeBounds.length + 1
123129

124130
private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
125131

@@ -155,4 +161,16 @@ class RangePartitioner[K : Ordering : ClassTag, V](
155161
case _ =>
156162
false
157163
}
164+
165+
override def hashCode(): Int = {
166+
val prime = 31
167+
var result = 1
168+
var i = 0
169+
while (i < rangeBounds.length) {
170+
result = prime * result + rangeBounds(i).hashCode
171+
i += 1
172+
}
173+
result = prime * result + ascending.hashCode
174+
result
175+
}
158176
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {
455455
*/
456456
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
457457
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
458-
minPartitions).map(pair => pair._2.toString)
458+
minPartitions).map(pair => pair._2.toString).setName(path)
459459
}
460460

461461
/**
@@ -496,7 +496,7 @@ class SparkContext(config: SparkConf) extends Logging {
496496
classOf[String],
497497
classOf[String],
498498
updateConf,
499-
minPartitions)
499+
minPartitions).setName(path)
500500
}
501501

502502
/**
@@ -551,7 +551,7 @@ class SparkContext(config: SparkConf) extends Logging {
551551
inputFormatClass,
552552
keyClass,
553553
valueClass,
554-
minPartitions)
554+
minPartitions).setName(path)
555555
}
556556

557557
/**
@@ -623,7 +623,7 @@ class SparkContext(config: SparkConf) extends Logging {
623623
val job = new NewHadoopJob(conf)
624624
NewFileInputFormat.addInputPath(job, new Path(path))
625625
val updatedConf = job.getConfiguration
626-
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
626+
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
627627
}
628628

629629
/**

core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,6 @@ private[spark] class PythonPartitioner(
5050
case _ =>
5151
false
5252
}
53+
54+
override def hashCode: Int = 31 * numPartitions + pyPartitionFunctionId.hashCode
5355
}

core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.language.postfixOps
2525
import scala.util.Random
2626

2727
import org.scalatest.{BeforeAndAfter, FunSuite}
28-
import org.scalatest.concurrent.Eventually
28+
import org.scalatest.concurrent.{PatienceConfiguration, Eventually}
2929
import org.scalatest.concurrent.Eventually._
3030
import org.scalatest.time.SpanSugar._
3131

@@ -76,7 +76,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
7676
tester.assertCleanup()
7777

7878
// Verify that shuffles can be re-executed after cleaning up
79-
assert(rdd.collect().toList === collected)
79+
assert(rdd.collect().toList.equals(collected))
8080
}
8181

8282
test("cleanup broadcast") {
@@ -285,7 +285,7 @@ class CleanerTester(
285285
sc.cleaner.get.attachListener(cleanerListener)
286286

287287
/** Assert that all the stuff has been cleaned up */
288-
def assertCleanup()(implicit waitTimeout: Eventually.Timeout) {
288+
def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
289289
try {
290290
eventually(waitTimeout, interval(100 millis)) {
291291
assert(isAllCleanedUp)

core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
2323

2424
// This test suite should run all tests in ShuffleSuite with Netty shuffle mode.
2525

26-
override def beforeAll(configMap: Map[String, Any]) {
26+
override def beforeAll() {
2727
System.setProperty("spark.shuffle.use.netty", "true")
2828
}
2929

30-
override def afterAll(configMap: Map[String, Any]) {
30+
override def afterAll() {
3131
System.setProperty("spark.shuffle.use.netty", "false")
3232
}
3333
}

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
266266

267267
// we can optionally shuffle to keep the upstream parallel
268268
val coalesced5 = data.coalesce(1, shuffle = true)
269-
assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
270-
null)
269+
val isEquals = coalesced5.dependencies.head.rdd.dependencies.head.rdd.
270+
asInstanceOf[ShuffledRDD[_, _, _]] != null
271+
assert(isEquals)
271272

272273
// when shuffling, we can increase the number of partitions
273274
val coalesced6 = data.coalesce(20, shuffle = true)

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.language.reflectiveCalls
2323

2424
import akka.actor._
2525
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
26-
import org.scalatest.{BeforeAndAfter, FunSuite}
26+
import org.scalatest.{BeforeAndAfter, FunSuiteLike}
2727

2828
import org.apache.spark._
2929
import org.apache.spark.rdd.RDD
@@ -37,7 +37,7 @@ class BuggyDAGEventProcessActor extends Actor {
3737
}
3838
}
3939

40-
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite
40+
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
4141
with ImplicitSender with BeforeAndAfter with LocalSparkContext {
4242

4343
val conf = new SparkConf

core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ class TimeStampedHashMapSuite extends FunSuite {
105105
map("k1") = strongRef
106106
map("k2") = "v2"
107107
map("k3") = "v3"
108-
assert(map("k1") === strongRef)
108+
val isEquals = map("k1") == strongRef
109+
assert(isEquals)
109110

110111
// clear strong reference to "k1"
111112
strongRef = null

0 commit comments

Comments
 (0)