Skip to content

Commit 04c6f7e

Browse files
committed
Merge branch 'master' into allocateExecutors
2 parents aff827c + 219dc00 commit 04c6f7e

File tree

63 files changed

+326
-123
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+326
-123
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bin/run-example

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
5151
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
5252
fi
5353

54-
./bin/spark-submit \
54+
"$FWDIR"/bin/spark-submit \
5555
--master $EXAMPLE_MASTER \
5656
--class $EXAMPLE_CLASS \
5757
"$SPARK_EXAMPLES_JAR" \

core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

@@ -235,7 +235,7 @@
235235
</dependency>
236236
<dependency>
237237
<groupId>org.easymock</groupId>
238-
<artifactId>easymock</artifactId>
238+
<artifactId>easymockclassextension</artifactId>
239239
<scope>test</scope>
240240
</dependency>
241241
<dependency>

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,17 @@ class HashPartitioner(partitions: Int) extends Partitioner {
8383
case _ =>
8484
false
8585
}
86+
87+
override def hashCode: Int = numPartitions
8688
}
8789

8890
/**
8991
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
9092
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
93+
*
94+
* Note that the actual number of partitions created by the RangePartitioner might not be the same
95+
* as the `partitions` parameter, in the case where the number of sampled records is less than
96+
* the value of `partitions`.
9197
*/
9298
class RangePartitioner[K : Ordering : ClassTag, V](
9399
partitions: Int,
@@ -119,7 +125,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
119125
}
120126
}
121127

122-
def numPartitions = partitions
128+
def numPartitions = rangeBounds.length + 1
123129

124130
private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
125131

@@ -155,4 +161,16 @@ class RangePartitioner[K : Ordering : ClassTag, V](
155161
case _ =>
156162
false
157163
}
164+
165+
override def hashCode(): Int = {
166+
val prime = 31
167+
var result = 1
168+
var i = 0
169+
while (i < rangeBounds.length) {
170+
result = prime * result + rangeBounds(i).hashCode
171+
i += 1
172+
}
173+
result = prime * result + ascending.hashCode
174+
result
175+
}
158176
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {
455455
*/
456456
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
457457
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
458-
minPartitions).map(pair => pair._2.toString)
458+
minPartitions).map(pair => pair._2.toString).setName(path)
459459
}
460460

461461
/**
@@ -496,7 +496,7 @@ class SparkContext(config: SparkConf) extends Logging {
496496
classOf[String],
497497
classOf[String],
498498
updateConf,
499-
minPartitions)
499+
minPartitions).setName(path)
500500
}
501501

502502
/**
@@ -551,7 +551,7 @@ class SparkContext(config: SparkConf) extends Logging {
551551
inputFormatClass,
552552
keyClass,
553553
valueClass,
554-
minPartitions)
554+
minPartitions).setName(path)
555555
}
556556

557557
/**
@@ -623,7 +623,7 @@ class SparkContext(config: SparkConf) extends Logging {
623623
val job = new NewHadoopJob(conf)
624624
NewFileInputFormat.addInputPath(job, new Path(path))
625625
val updatedConf = job.getConfiguration
626-
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
626+
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
627627
}
628628

629629
/**

core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,6 @@ private[spark] class PythonPartitioner(
5050
case _ =>
5151
false
5252
}
53+
54+
override def hashCode: Int = 31 * numPartitions + pyPartitionFunctionId.hashCode
5355
}

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
737737
val outfmt = job.getOutputFormatClass
738738
val jobFormat = outfmt.newInstance
739739

740-
if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
740+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
741+
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
741742
// FileOutputFormat ignores the filesystem parameter
742743
jobFormat.checkOutputSpecs(job)
743744
}
@@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
803804
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
804805
valueClass.getSimpleName + ")")
805806

806-
if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
807+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
808+
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
807809
// FileOutputFormat ignores the filesystem parameter
808810
val ignoredFs = FileSystem.get(conf)
809811
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
3838
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
3939
import org.apache.spark.rdd.RDD
4040
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
41-
import org.apache.spark.util.Utils
41+
import org.apache.spark.util.{SystemClock, Clock, Utils}
4242

4343
/**
4444
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -61,7 +61,8 @@ class DAGScheduler(
6161
listenerBus: LiveListenerBus,
6262
mapOutputTracker: MapOutputTrackerMaster,
6363
blockManagerMaster: BlockManagerMaster,
64-
env: SparkEnv)
64+
env: SparkEnv,
65+
clock: Clock = SystemClock)
6566
extends Logging {
6667

6768
import DAGScheduler._
@@ -781,7 +782,7 @@ class DAGScheduler(
781782
logDebug("New pending tasks: " + myPending)
782783
taskScheduler.submitTasks(
783784
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
784-
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
785+
stageToInfos(stage).submissionTime = Some(clock.getTime())
785786
} else {
786787
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
787788
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
@@ -807,11 +808,11 @@ class DAGScheduler(
807808

808809
def markStageAsFinished(stage: Stage) = {
809810
val serviceTime = stageToInfos(stage).submissionTime match {
810-
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
811+
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
811812
case _ => "Unknown"
812813
}
813814
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
814-
stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
815+
stageToInfos(stage).completionTime = Some(clock.getTime())
815816
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
816817
runningStages -= stage
817818
}
@@ -1015,7 +1016,7 @@ class DAGScheduler(
10151016
return
10161017
}
10171018
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
1018-
stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis())
1019+
stageToInfos(failedStage).completionTime = Some(clock.getTime())
10191020
for (resultStage <- dependentStages) {
10201021
val job = resultStageToJob(resultStage)
10211022
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason",

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
2020
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
2121
import java.util.Comparator
2222

23+
import scala.collection.BufferedIterator
2324
import scala.collection.mutable
2425
import scala.collection.mutable.ArrayBuffer
2526

@@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
231232
// Input streams are derived both from the in-memory map and spilled maps on disk
232233
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
233234
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
234-
private val inputStreams = Seq(sortedMap) ++ spilledMaps
235+
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
235236

236237
inputStreams.foreach { it =>
237238
val kcPairs = getMorePairs(it)
@@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
246247
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
247248
* Assume the given iterator is in sorted order.
248249
*/
249-
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
250+
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
250251
val kcPairs = new ArrayBuffer[(K, C)]
251252
if (it.hasNext) {
252253
var kc = it.next()
253254
kcPairs += kc
254255
val minHash = kc._1.hashCode()
255-
while (it.hasNext && kc._1.hashCode() == minHash) {
256+
while (it.hasNext && it.head._1.hashCode() == minHash) {
256257
kc = it.next()
257258
kcPairs += kc
258259
}
@@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
325326
*
326327
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
327328
*/
328-
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
329+
private class StreamBuffer(
330+
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
329331
extends Comparable[StreamBuffer] {
330332

331333
def isEmpty = pairs.length == 0

0 commit comments

Comments
 (0)