Skip to content

Commit 2e89461

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-2682
2 parents 54e3b66 + 32bcf9a commit 2e89461

File tree

76 files changed

+1095
-73
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+1095
-73
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,7 +1037,7 @@ class SparkContext(config: SparkConf) extends Logging {
10371037
*/
10381038
private[spark] def getCallSite(): CallSite = {
10391039
Option(getLocalProperty("externalCallSite")) match {
1040-
case Some(callSite) => CallSite(callSite, long = "")
1040+
case Some(callSite) => CallSite(callSite, longForm = "")
10411041
case None => Utils.getCallSite
10421042
}
10431043
}
@@ -1059,11 +1059,12 @@ class SparkContext(config: SparkConf) extends Logging {
10591059
}
10601060
val callSite = getCallSite
10611061
val cleanedFunc = clean(func)
1062-
logInfo("Starting job: " + callSite.short)
1062+
logInfo("Starting job: " + callSite.shortForm)
10631063
val start = System.nanoTime
10641064
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
10651065
resultHandler, localProperties.get)
1066-
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
1066+
logInfo(
1067+
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
10671068
rdd.doCheckpoint()
10681069
}
10691070

@@ -1144,11 +1145,12 @@ class SparkContext(config: SparkConf) extends Logging {
11441145
evaluator: ApproximateEvaluator[U, R],
11451146
timeout: Long): PartialResult[R] = {
11461147
val callSite = getCallSite
1147-
logInfo("Starting job: " + callSite.short)
1148+
logInfo("Starting job: " + callSite.shortForm)
11481149
val start = System.nanoTime
11491150
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
11501151
localProperties.get)
1151-
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
1152+
logInfo(
1153+
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
11521154
result
11531155
}
11541156

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.language.existentials
2525
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
2626
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
2727
import org.apache.spark.annotation.DeveloperApi
28-
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
28+
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
2929
import org.apache.spark.serializer.Serializer
3030
import org.apache.spark.shuffle.ShuffleHandle
3131

@@ -66,14 +66,14 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
6666
*/
6767
@DeveloperApi
6868
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
69-
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
69+
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
7070

7171
// For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
7272
// Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
7373
// CoGroupValue is the intermediate state of each value before being merged in compute.
74-
private type CoGroup = ArrayBuffer[Any]
74+
private type CoGroup = CompactBuffer[Any]
7575
private type CoGroupValue = (Any, Int) // Int is dependency number
76-
private type CoGroupCombiner = Seq[CoGroup]
76+
private type CoGroupCombiner = Array[CoGroup]
7777

7878
private var serializer: Option[Serializer] = None
7979

@@ -114,7 +114,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
114114

115115
override val partitioner: Some[Partitioner] = Some(part)
116116

117-
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
117+
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
118118
val sparkConf = SparkEnv.get.conf
119119
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
120120
val split = s.asInstanceOf[CoGroupPartition]
@@ -150,7 +150,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
150150
getCombiner(kv._1)(depNum) += kv._2
151151
}
152152
}
153-
new InterruptibleIterator(context, map.iterator)
153+
new InterruptibleIterator(context,
154+
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
154155
} else {
155156
val map = createExternalMap(numRdds)
156157
rddIterators.foreach { case (it, depNum) =>
@@ -161,7 +162,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
161162
}
162163
context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
163164
context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
164-
new InterruptibleIterator(context, map.iterator)
165+
new InterruptibleIterator(context,
166+
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
165167
}
166168
}
167169

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
4646
import org.apache.spark.SparkContext._
4747
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4848
import org.apache.spark.serializer.Serializer
49+
import org.apache.spark.util.collection.CompactBuffer
4950

5051
/**
5152
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -361,12 +362,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
361362
// groupByKey shouldn't use map side combine because map side combine does not
362363
// reduce the amount of data shuffled and requires all map side data be inserted
363364
// into a hash table, leading to more objects in the old gen.
364-
val createCombiner = (v: V) => ArrayBuffer(v)
365-
val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
366-
val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
367-
val bufs = combineByKey[ArrayBuffer[V]](
365+
val createCombiner = (v: V) => CompactBuffer(v)
366+
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
367+
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
368+
val bufs = combineByKey[CompactBuffer[V]](
368369
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
369-
bufs.mapValues(_.toIterable)
370+
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
370371
}
371372

372373
/**
@@ -571,11 +572,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
571572
throw new SparkException("Default partitioner cannot partition array keys.")
572573
}
573574
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
574-
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
575-
(vs.asInstanceOf[Seq[V]],
576-
w1s.asInstanceOf[Seq[W1]],
577-
w2s.asInstanceOf[Seq[W2]],
578-
w3s.asInstanceOf[Seq[W3]])
575+
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
576+
(vs.asInstanceOf[Iterable[V]],
577+
w1s.asInstanceOf[Iterable[W1]],
578+
w2s.asInstanceOf[Iterable[W2]],
579+
w3s.asInstanceOf[Iterable[W3]])
579580
}
580581
}
581582

@@ -589,8 +590,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
589590
throw new SparkException("Default partitioner cannot partition array keys.")
590591
}
591592
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
592-
cg.mapValues { case Seq(vs, w1s) =>
593-
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
593+
cg.mapValues { case Array(vs, w1s) =>
594+
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
594595
}
595596
}
596597

@@ -604,10 +605,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
604605
throw new SparkException("Default partitioner cannot partition array keys.")
605606
}
606607
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
607-
cg.mapValues { case Seq(vs, w1s, w2s) =>
608-
(vs.asInstanceOf[Seq[V]],
609-
w1s.asInstanceOf[Seq[W1]],
610-
w2s.asInstanceOf[Seq[W2]])
608+
cg.mapValues { case Array(vs, w1s, w2s) =>
609+
(vs.asInstanceOf[Iterable[V]],
610+
w1s.asInstanceOf[Iterable[W1]],
611+
w2s.asInstanceOf[Iterable[W2]])
611612
}
612613
}
613614

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ abstract class RDD[T: ClassTag](
354354
def sample(withReplacement: Boolean,
355355
fraction: Double,
356356
seed: Long = Utils.random.nextLong): RDD[T] = {
357-
require(fraction >= 0.0, "Invalid fraction value: " + fraction)
357+
require(fraction >= 0.0, "Negative fraction value: " + fraction)
358358
if (withReplacement) {
359359
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
360360
} else {
@@ -754,14 +754,16 @@ abstract class RDD[T: ClassTag](
754754
* Applies a function f to all elements of this RDD.
755755
*/
756756
def foreach(f: T => Unit) {
757-
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
757+
val cleanF = sc.clean(f)
758+
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
758759
}
759760

760761
/**
761762
* Applies a function f to each partition of this RDD.
762763
*/
763764
def foreachPartition(f: Iterator[T] => Unit) {
764-
sc.runJob(this, (iter: Iterator[T]) => f(iter))
765+
val cleanF = sc.clean(f)
766+
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
765767
}
766768

767769
/**
@@ -1223,7 +1225,7 @@ abstract class RDD[T: ClassTag](
12231225

12241226
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
12251227
@transient private[spark] val creationSite = Utils.getCallSite
1226-
private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("")
1228+
private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
12271229

12281230
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
12291231

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ class DAGScheduler(
455455
waiter.awaitResult() match {
456456
case JobSucceeded => {}
457457
case JobFailed(exception: Exception) =>
458-
logInfo("Failed to run " + callSite.short)
458+
logInfo("Failed to run " + callSite.shortForm)
459459
throw exception
460460
}
461461
}
@@ -679,7 +679,7 @@ class DAGScheduler(
679679
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
680680
clearCacheLocs()
681681
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
682-
job.jobId, callSite.short, partitions.length, allowLocal))
682+
job.jobId, callSite.shortForm, partitions.length, allowLocal))
683683
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
684684
logInfo("Parents of final stage: " + finalStage.parents)
685685
logInfo("Missing parents: " + getMissingParentStages(finalStage))

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ private[spark] class Stage(
108108

109109
def attemptId: Int = nextAttemptId
110110

111-
val name = callSite.short
112-
val details = callSite.long
111+
val name = callSite.shortForm
112+
val details = callSite.longForm
113113

114114
override def toString = "Stage " + id
115115

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.scheduler.MapStatus
3131
import org.apache.spark.storage._
3232
import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
3333
import org.apache.spark.util.BoundedPriorityQueue
34+
import org.apache.spark.util.collection.CompactBuffer
3435

3536
import scala.reflect.ClassTag
3637

@@ -185,6 +186,7 @@ private[serializer] object KryoSerializer {
185186
classOf[GotBlock],
186187
classOf[GetBlock],
187188
classOf[MapStatus],
189+
classOf[CompactBuffer[_]],
188190
classOf[BlockManagerId],
189191
classOf[Array[Byte]],
190192
classOf[BoundedPriorityQueue[_]],

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
4444
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
4545

4646
/** CallSite represents a place in user code. It can have a short and a long form. */
47-
private[spark] case class CallSite(short: String, long: String)
47+
private[spark] case class CallSite(shortForm: String, longForm: String)
4848

4949
/**
5050
* Various utility methods used by Spark.
@@ -848,8 +848,8 @@ private[spark] object Utils extends Logging {
848848
}
849849
val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
850850
CallSite(
851-
short = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine),
852-
long = callStack.take(callStackDepth).mkString("\n"))
851+
shortForm = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine),
852+
longForm = callStack.take(callStackDepth).mkString("\n"))
853853
}
854854

855855
/** Return a string containing part of a file from byte 'start' to 'end'. */

0 commit comments

Comments
 (0)