Skip to content

Commit 6645a8a

Browse files
committed
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
2 parents b7e165c + afd757a commit 6645a8a

File tree

93 files changed

+1829
-180
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+1829
-180
lines changed

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.rdd.SortOrder.SortOrder
2223
import org.apache.spark.serializer.Serializer
2324
import org.apache.spark.shuffle.ShuffleHandle
2425

@@ -62,7 +63,8 @@ class ShuffleDependency[K, V, C](
6263
val serializer: Option[Serializer] = None,
6364
val keyOrdering: Option[Ordering[K]] = None,
6465
val aggregator: Option[Aggregator[K, V, C]] = None,
65-
val mapSideCombine: Boolean = false)
66+
val mapSideCombine: Boolean = false,
67+
val sortOrder: Option[SortOrder] = None)
6668
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
6769

6870
val shuffleId: Int = rdd.context.newShuffleId()

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,7 +1037,7 @@ class SparkContext(config: SparkConf) extends Logging {
10371037
*/
10381038
private[spark] def getCallSite(): CallSite = {
10391039
Option(getLocalProperty("externalCallSite")) match {
1040-
case Some(callSite) => CallSite(callSite, long = "")
1040+
case Some(callSite) => CallSite(callSite, longForm = "")
10411041
case None => Utils.getCallSite
10421042
}
10431043
}
@@ -1059,11 +1059,12 @@ class SparkContext(config: SparkConf) extends Logging {
10591059
}
10601060
val callSite = getCallSite
10611061
val cleanedFunc = clean(func)
1062-
logInfo("Starting job: " + callSite.short)
1062+
logInfo("Starting job: " + callSite.shortForm)
10631063
val start = System.nanoTime
10641064
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
10651065
resultHandler, localProperties.get)
1066-
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
1066+
logInfo(
1067+
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
10671068
rdd.doCheckpoint()
10681069
}
10691070

@@ -1144,11 +1145,12 @@ class SparkContext(config: SparkConf) extends Logging {
11441145
evaluator: ApproximateEvaluator[U, R],
11451146
timeout: Long): PartialResult[R] = {
11461147
val callSite = getCallSite
1147-
logInfo("Starting job: " + callSite.short)
1148+
logInfo("Starting job: " + callSite.shortForm)
11481149
val start = System.nanoTime
11491150
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
11501151
localProperties.get)
1151-
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
1152+
logInfo(
1153+
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
11521154
result
11531155
}
11541156

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,10 @@ private[spark] class PythonRDD[T: ClassTag](
5757
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
5858
val startTime = System.currentTimeMillis
5959
val env = SparkEnv.get
60-
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
60+
val localdir = env.blockManager.diskBlockManager.localDirs.map(
61+
f => f.getPath()).mkString(",")
62+
val worker: Socket = env.createPythonWorker(pythonExec,
63+
envVars.toMap + ("SPARK_LOCAL_DIR" -> localdir))
6164

6265
// Start a thread to feed the process input from our parent's iterator
6366
val writerThread = new WriterThread(env, worker, split, context)

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.language.existentials
2525
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
2626
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
2727
import org.apache.spark.annotation.DeveloperApi
28-
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
28+
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
2929
import org.apache.spark.serializer.Serializer
3030
import org.apache.spark.shuffle.ShuffleHandle
3131

@@ -66,14 +66,14 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
6666
*/
6767
@DeveloperApi
6868
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
69-
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
69+
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
7070

7171
// For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
7272
// Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
7373
// CoGroupValue is the intermediate state of each value before being merged in compute.
74-
private type CoGroup = ArrayBuffer[Any]
74+
private type CoGroup = CompactBuffer[Any]
7575
private type CoGroupValue = (Any, Int) // Int is dependency number
76-
private type CoGroupCombiner = Seq[CoGroup]
76+
private type CoGroupCombiner = Array[CoGroup]
7777

7878
private var serializer: Option[Serializer] = None
7979

@@ -114,7 +114,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
114114

115115
override val partitioner: Some[Partitioner] = Some(part)
116116

117-
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
117+
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
118118
val sparkConf = SparkEnv.get.conf
119119
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
120120
val split = s.asInstanceOf[CoGroupPartition]
@@ -150,7 +150,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
150150
getCombiner(kv._1)(depNum) += kv._2
151151
}
152152
}
153-
new InterruptibleIterator(context, map.iterator)
153+
new InterruptibleIterator(context,
154+
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
154155
} else {
155156
val map = createExternalMap(numRdds)
156157
rddIterators.foreach { case (it, depNum) =>
@@ -161,7 +162,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
161162
}
162163
context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
163164
context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
164-
new InterruptibleIterator(context, map.iterator)
165+
new InterruptibleIterator(context,
166+
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
165167
}
166168
}
167169

core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,13 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
5757
*/
5858
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
5959
val part = new RangePartitioner(numPartitions, self, ascending)
60-
val shuffled = new ShuffledRDD[K, V, V, P](self, part).setKeyOrdering(ordering)
61-
shuffled.mapPartitions(iter => {
62-
val buf = iter.toArray
63-
if (ascending) {
64-
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
65-
} else {
66-
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
67-
}
68-
}, preservesPartitioning = true)
60+
new ShuffledRDD[K, V, V, P](self, part)
61+
.setKeyOrdering(ordering)
62+
.setSortOrder(if (ascending) SortOrder.ASCENDING else SortOrder.DESCENDING)
6963
}
7064
}
65+
66+
private[spark] object SortOrder extends Enumeration {
67+
type SortOrder = Value
68+
val ASCENDING, DESCENDING = Value
69+
}

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
4646
import org.apache.spark.SparkContext._
4747
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4848
import org.apache.spark.serializer.Serializer
49+
import org.apache.spark.util.collection.CompactBuffer
4950

5051
/**
5152
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -361,12 +362,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
361362
// groupByKey shouldn't use map side combine because map side combine does not
362363
// reduce the amount of data shuffled and requires all map side data be inserted
363364
// into a hash table, leading to more objects in the old gen.
364-
val createCombiner = (v: V) => ArrayBuffer(v)
365-
val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
366-
val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
367-
val bufs = combineByKey[ArrayBuffer[V]](
365+
val createCombiner = (v: V) => CompactBuffer(v)
366+
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
367+
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
368+
val bufs = combineByKey[CompactBuffer[V]](
368369
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
369-
bufs.mapValues(_.toIterable)
370+
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
370371
}
371372

372373
/**
@@ -571,11 +572,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
571572
throw new SparkException("Default partitioner cannot partition array keys.")
572573
}
573574
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
574-
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
575-
(vs.asInstanceOf[Seq[V]],
576-
w1s.asInstanceOf[Seq[W1]],
577-
w2s.asInstanceOf[Seq[W2]],
578-
w3s.asInstanceOf[Seq[W3]])
575+
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
576+
(vs.asInstanceOf[Iterable[V]],
577+
w1s.asInstanceOf[Iterable[W1]],
578+
w2s.asInstanceOf[Iterable[W2]],
579+
w3s.asInstanceOf[Iterable[W3]])
579580
}
580581
}
581582

@@ -589,8 +590,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
589590
throw new SparkException("Default partitioner cannot partition array keys.")
590591
}
591592
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
592-
cg.mapValues { case Seq(vs, w1s) =>
593-
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
593+
cg.mapValues { case Array(vs, w1s) =>
594+
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
594595
}
595596
}
596597

@@ -604,10 +605,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
604605
throw new SparkException("Default partitioner cannot partition array keys.")
605606
}
606607
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
607-
cg.mapValues { case Seq(vs, w1s, w2s) =>
608-
(vs.asInstanceOf[Seq[V]],
609-
w1s.asInstanceOf[Seq[W1]],
610-
w2s.asInstanceOf[Seq[W2]])
608+
cg.mapValues { case Array(vs, w1s, w2s) =>
609+
(vs.asInstanceOf[Iterable[V]],
610+
w1s.asInstanceOf[Iterable[W1]],
611+
w2s.asInstanceOf[Iterable[W2]])
611612
}
612613
}
613614

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ abstract class RDD[T: ClassTag](
354354
def sample(withReplacement: Boolean,
355355
fraction: Double,
356356
seed: Long = Utils.random.nextLong): RDD[T] = {
357-
require(fraction >= 0.0, "Invalid fraction value: " + fraction)
357+
require(fraction >= 0.0, "Negative fraction value: " + fraction)
358358
if (withReplacement) {
359359
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
360360
} else {
@@ -754,14 +754,16 @@ abstract class RDD[T: ClassTag](
754754
* Applies a function f to all elements of this RDD.
755755
*/
756756
def foreach(f: T => Unit) {
757-
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
757+
val cleanF = sc.clean(f)
758+
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
758759
}
759760

760761
/**
761762
* Applies a function f to each partition of this RDD.
762763
*/
763764
def foreachPartition(f: Iterator[T] => Unit) {
764-
sc.runJob(this, (iter: Iterator[T]) => f(iter))
765+
val cleanF = sc.clean(f)
766+
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
765767
}
766768

767769
/**
@@ -1223,7 +1225,7 @@ abstract class RDD[T: ClassTag](
12231225

12241226
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
12251227
@transient private[spark] val creationSite = Utils.getCallSite
1226-
private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("")
1228+
private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
12271229

12281230
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
12291231

core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
2121

2222
import org.apache.spark._
2323
import org.apache.spark.annotation.DeveloperApi
24+
import org.apache.spark.rdd.SortOrder.SortOrder
2425
import org.apache.spark.serializer.Serializer
2526

2627
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -51,6 +52,8 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
5152

5253
private var mapSideCombine: Boolean = false
5354

55+
private var sortOrder: Option[SortOrder] = None
56+
5457
/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
5558
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = {
5659
this.serializer = Option(serializer)
@@ -75,8 +78,15 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
7578
this
7679
}
7780

81+
/** Set sort order for RDD's sorting. */
82+
def setSortOrder(sortOrder: SortOrder): ShuffledRDD[K, V, C, P] = {
83+
this.sortOrder = Option(sortOrder)
84+
this
85+
}
86+
7887
override def getDependencies: Seq[Dependency[_]] = {
79-
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
88+
List(new ShuffleDependency(prev, part, serializer,
89+
keyOrdering, aggregator, mapSideCombine, sortOrder))
8090
}
8191

8292
override val partitioner = Some(part)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ class DAGScheduler(
455455
waiter.awaitResult() match {
456456
case JobSucceeded => {}
457457
case JobFailed(exception: Exception) =>
458-
logInfo("Failed to run " + callSite.short)
458+
logInfo("Failed to run " + callSite.shortForm)
459459
throw exception
460460
}
461461
}
@@ -679,7 +679,7 @@ class DAGScheduler(
679679
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
680680
clearCacheLocs()
681681
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
682-
job.jobId, callSite.short, partitions.length, allowLocal))
682+
job.jobId, callSite.shortForm, partitions.length, allowLocal))
683683
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
684684
logInfo("Parents of final stage: " + finalStage.parents)
685685
logInfo("Missing parents: " + getMissingParentStages(finalStage))
@@ -710,7 +710,6 @@ class DAGScheduler(
710710
if (missing == Nil) {
711711
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
712712
submitMissingTasks(stage, jobId.get)
713-
runningStages += stage
714713
} else {
715714
for (parent <- missing) {
716715
submitStage(parent)
@@ -753,11 +752,14 @@ class DAGScheduler(
753752
null
754753
}
755754

756-
// must be run listener before possible NotSerializableException
757-
// should be "StageSubmitted" first and then "JobEnded"
758-
listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
759-
760755
if (tasks.size > 0) {
756+
runningStages += stage
757+
// SparkListenerStageSubmitted should be posted before testing whether tasks are
758+
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
759+
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
760+
// event.
761+
listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
762+
761763
// Preemptively serialize a task to make sure it can be serialized. We are catching this
762764
// exception here because it would be fairly hard to catch the non-serializable exception
763765
// down the road, where we have several different implementations for local scheduler and

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ private[spark] class Stage(
108108

109109
def attemptId: Int = nextAttemptId
110110

111-
val name = callSite.short
112-
val details = callSite.long
111+
val name = callSite.shortForm
112+
val details = callSite.longForm
113113

114114
override def toString = "Stage " + id
115115

0 commit comments

Comments
 (0)