Skip to content

Commit a17bda2

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into SPARK-1930
2 parents 478ca15 + b3736e3 commit a17bda2

File tree

43 files changed

+997
-573
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+997
-573
lines changed

bin/compute-classpath.sh

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ else
3838
JAR_CMD="jar"
3939
fi
4040

41-
# First check if we have a dependencies jar. If so, include binary classes with the deps jar
42-
if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
41+
# A developer option to prepend more recently compiled Spark classes
42+
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
43+
echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
44+
"classes ahead of assembly." >&2
4345
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes"
4446
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
4547
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
@@ -51,17 +53,31 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
5153
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
5254
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
5355
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
56+
fi
5457

55-
ASSEMBLY_JAR=$(ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar 2>/dev/null)
58+
# Use spark-assembly jar from either RELEASE or assembly directory
59+
if [ -f "$FWDIR/RELEASE" ]; then
60+
assembly_folder="$FWDIR"/lib
5661
else
57-
# Else use spark-assembly jar from either RELEASE or assembly directory
58-
if [ -f "$FWDIR/RELEASE" ]; then
59-
ASSEMBLY_JAR=$(ls "$FWDIR"/lib/spark-assembly*hadoop*.jar 2>/dev/null)
60-
else
61-
ASSEMBLY_JAR=$(ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*.jar 2>/dev/null)
62-
fi
62+
assembly_folder="$ASSEMBLY_DIR"
6363
fi
6464

65+
num_jars=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)
66+
if [ "$num_jars" -eq "0" ]; then
67+
echo "Failed to find Spark assembly in $assembly_folder"
68+
echo "You need to build Spark before running this program."
69+
exit 1
70+
fi
71+
if [ "$num_jars" -gt "1" ]; then
72+
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar")
73+
echo "Found multiple Spark assembly jars in $assembly_folder:"
74+
echo "$jars_list"
75+
echo "Please remove all but one jar."
76+
exit 1
77+
fi
78+
79+
ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)
80+
6581
# Verify that versions of java used to build the jars and run Spark are compatible
6682
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
6783
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then

bin/spark-class

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -108,23 +108,6 @@ fi
108108
export JAVA_OPTS
109109
# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
110110

111-
if [ ! -f "$FWDIR/RELEASE" ]; then
112-
# Exit if the user hasn't compiled Spark
113-
num_jars=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar" | wc -l)
114-
jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar")
115-
if [ "$num_jars" -eq "0" ]; then
116-
echo "Failed to find Spark assembly in $FWDIR/assembly/target/scala-$SCALA_VERSION/" >&2
117-
echo "You need to build Spark before running this program." >&2
118-
exit 1
119-
fi
120-
if [ "$num_jars" -gt "1" ]; then
121-
echo "Found multiple Spark assembly jars in $FWDIR/assembly/target/scala-$SCALA_VERSION:" >&2
122-
echo "$jars_list"
123-
echo "Please remove all but one jar."
124-
exit 1
125-
fi
126-
fi
127-
128111
TOOLS_DIR="$FWDIR"/tools
129112
SPARK_TOOLS_JAR=""
130113
if [ -e "$TOOLS_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9Tg].jar ]; then

core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@
6767
<groupId>org.apache.commons</groupId>
6868
<artifactId>commons-lang3</artifactId>
6969
</dependency>
70+
<dependency>
71+
<groupId>org.apache.commons</groupId>
72+
<artifactId>commons-math3</artifactId>
73+
<version>3.3</version>
74+
<scope>test</scope>
75+
</dependency>
7076
<dependency>
7177
<groupId>com.google.code.findbugs</groupId>
7278
<artifactId>jsr305</artifactId>

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
3232
private val loading = new HashSet[RDDBlockId]()
3333

3434
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
35-
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
35+
def getOrCompute[T](
36+
rdd: RDD[T],
37+
split: Partition,
38+
context: TaskContext,
3639
storageLevel: StorageLevel): Iterator[T] = {
40+
3741
val key = RDDBlockId(rdd.id, split.index)
38-
logDebug("Looking for partition " + key)
42+
logDebug(s"Looking for partition $key")
3943
blockManager.get(key) match {
4044
case Some(values) =>
4145
// Partition is already materialized, so just return its values
@@ -45,7 +49,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4549
// Mark the split as loading (unless someone else marks it first)
4650
loading.synchronized {
4751
if (loading.contains(key)) {
48-
logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
52+
logInfo(s"Another thread is loading $key, waiting for it to finish...")
4953
while (loading.contains(key)) {
5054
try {
5155
loading.wait()
@@ -54,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
5458
logWarning(s"Got an exception while waiting for another thread to load $key", e)
5559
}
5660
}
57-
logInfo("Finished waiting for %s".format(key))
61+
logInfo(s"Finished waiting for $key")
5862
/* See whether someone else has successfully loaded it. The main way this would fail
5963
* is for the RDD-level cache eviction policy if someone else has loaded the same RDD
6064
* partition but we didn't want to make space for it. However, that case is unlikely
@@ -64,7 +68,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
6468
case Some(values) =>
6569
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
6670
case None =>
67-
logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
71+
logInfo(s"Whoever was loading $key failed; we'll try it ourselves")
6872
loading.add(key)
6973
}
7074
} else {
@@ -73,7 +77,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
7377
}
7478
try {
7579
// If we got here, we have to load the split
76-
logInfo("Partition %s not found, computing it".format(key))
80+
logInfo(s"Partition $key not found, computing it")
7781
val computedValues = rdd.computeOrReadCheckpoint(split, context)
7882

7983
// Persist the result, so long as the task is not running locally
@@ -97,8 +101,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
97101
case Some(values) =>
98102
values.asInstanceOf[Iterator[T]]
99103
case None =>
100-
logInfo("Failure to store %s".format(key))
101-
throw new Exception("Block manager failed to return persisted valued")
104+
logInfo(s"Failure to store $key")
105+
throw new SparkException("Block manager failed to return persisted value")
102106
}
103107
} else {
104108
// In this case the RDD is cached to an array buffer. This will save the results

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,14 +290,17 @@ class SparkContext(config: SparkConf) extends Logging {
290290
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
291291
executorEnvs(envKey) = value
292292
}
293+
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
294+
executorEnvs("SPARK_PREPEND_CLASSES") = v
295+
}
293296
// The Mesos scheduler backend relies on this environment variable to set executor memory.
294297
// TODO: Set this only in the Mesos scheduler.
295298
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
296299
executorEnvs ++= conf.getExecutorEnv
297300

298301
// Set SPARK_USER for user who is running SparkContext.
299302
val sparkUser = Option {
300-
Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER"))
303+
Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))
301304
}.getOrElse {
302305
SparkContext.SPARK_UNKNOWN_USER
303306
}
@@ -431,12 +434,21 @@ class SparkContext(config: SparkConf) extends Logging {
431434

432435
// Methods for creating RDDs
433436

434-
/** Distribute a local Scala collection to form an RDD. */
437+
/** Distribute a local Scala collection to form an RDD.
438+
*
439+
* @note Parallelize acts lazily. If `seq` is a mutable collection and is
440+
* altered after the call to parallelize and before the first action on the
441+
* RDD, the resultant RDD will reflect the modified collection. Pass a copy of
442+
* the argument to avoid this.
443+
*/
435444
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
436445
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
437446
}
438447

439-
/** Distribute a local Scala collection to form an RDD. */
448+
/** Distribute a local Scala collection to form an RDD.
449+
*
450+
* This method is identical to `parallelize`.
451+
*/
440452
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
441453
parallelize(seq, numSlices)
442454
}

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,50 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
228228
: PartialResult[java.util.Map[K, BoundedDouble]] =
229229
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
230230

231+
/**
232+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
233+
* This function can return a different result type, U, than the type of the values in this RDD,
234+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
235+
* as in scala.TraversableOnce. The former operation is used for merging values within a
236+
* partition, and the latter is used for merging values between partitions. To avoid memory
237+
* allocation, both of these functions are allowed to modify and return their first argument
238+
* instead of creating a new U.
239+
*/
240+
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U],
241+
combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
242+
implicit val ctag: ClassTag[U] = fakeClassTag
243+
fromRDD(rdd.aggregateByKey(zeroValue, partitioner)(seqFunc, combFunc))
244+
}
245+
246+
/**
247+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
248+
* This function can return a different result type, U, than the type of the values in this RDD,
249+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
250+
* as in scala.TraversableOnce. The former operation is used for merging values within a
251+
* partition, and the latter is used for merging values between partitions. To avoid memory
252+
* allocation, both of these functions are allowed to modify and return their first argument
253+
* instead of creating a new U.
254+
*/
255+
def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U],
256+
combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
257+
implicit val ctag: ClassTag[U] = fakeClassTag
258+
fromRDD(rdd.aggregateByKey(zeroValue, numPartitions)(seqFunc, combFunc))
259+
}
260+
261+
/**
262+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
263+
* This function can return a different result type, U, than the type of the values in this RDD,
264+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's.
265+
* The former operation is used for merging values within a partition, and the latter is used for
266+
* merging values between partitions. To avoid memory allocation, both of these functions are
267+
* allowed to modify and return their first argument instead of creating a new U.
268+
*/
269+
def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]):
270+
JavaPairRDD[K, U] = {
271+
implicit val ctag: ClassTag[U] = fakeClassTag
272+
fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
273+
}
274+
231275
/**
232276
* Merge the values for each key using an associative function and a neutral "zero value" which
233277
* may be added to the result an arbitrary number of times, and must not change the result

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
118118
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
119119
}
120120

121+
/**
122+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
123+
* This function can return a different result type, U, than the type of the values in this RDD,
124+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
125+
* as in scala.TraversableOnce. The former operation is used for merging values within a
126+
* partition, and the latter is used for merging values between partitions. To avoid memory
127+
* allocation, both of these functions are allowed to modify and return their first argument
128+
* instead of creating a new U.
129+
*/
130+
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
131+
combOp: (U, U) => U): RDD[(K, U)] = {
132+
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
133+
val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
134+
val zeroArray = new Array[Byte](zeroBuffer.limit)
135+
zeroBuffer.get(zeroArray)
136+
137+
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
138+
def createZero() = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
139+
140+
combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
141+
}
142+
143+
/**
144+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
145+
* This function can return a different result type, U, than the type of the values in this RDD,
146+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
147+
* as in scala.TraversableOnce. The former operation is used for merging values within a
148+
* partition, and the latter is used for merging values between partitions. To avoid memory
149+
* allocation, both of these functions are allowed to modify and return their first argument
150+
* instead of creating a new U.
151+
*/
152+
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
153+
combOp: (U, U) => U): RDD[(K, U)] = {
154+
aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
155+
}
156+
157+
/**
158+
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
159+
* This function can return a different result type, U, than the type of the values in this RDD,
160+
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
161+
* as in scala.TraversableOnce. The former operation is used for merging values within a
162+
* partition, and the latter is used for merging values between partitions. To avoid memory
163+
* allocation, both of these functions are allowed to modify and return their first argument
164+
* instead of creating a new U.
165+
*/
166+
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
167+
combOp: (U, U) => U): RDD[(K, U)] = {
168+
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
169+
}
170+
121171
/**
122172
* Merge the values for each key using an associative function and a neutral "zero value" which
123173
* may be added to the result an arbitrary number of times, and must not change the result

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.partial.PartialResult
4242
import org.apache.spark.storage.StorageLevel
4343
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
4444
import org.apache.spark.util.collection.OpenHashMap
45-
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}
45+
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
4646

4747
/**
4848
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
@@ -378,46 +378,56 @@ abstract class RDD[T: ClassTag](
378378
}.toArray
379379
}
380380

381-
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] =
382-
{
383-
var fraction = 0.0
384-
var total = 0
385-
val multiplier = 3.0
386-
val initialCount = this.count()
387-
var maxSelected = 0
381+
/**
382+
* Return a fixed-size sampled subset of this RDD in an array
383+
*
384+
* @param withReplacement whether sampling is done with replacement
385+
* @param num size of the returned sample
386+
* @param seed seed for the random number generator
387+
* @return sample of specified size in an array
388+
*/
389+
def takeSample(withReplacement: Boolean,
390+
num: Int,
391+
seed: Long = Utils.random.nextLong): Array[T] = {
392+
val numStDev = 10.0
388393

389394
if (num < 0) {
390395
throw new IllegalArgumentException("Negative number of elements requested")
396+
} else if (num == 0) {
397+
return new Array[T](0)
391398
}
392399

400+
val initialCount = this.count()
393401
if (initialCount == 0) {
394402
return new Array[T](0)
395403
}
396404

397-
if (initialCount > Integer.MAX_VALUE - 1) {
398-
maxSelected = Integer.MAX_VALUE - 1
399-
} else {
400-
maxSelected = initialCount.toInt
405+
val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
406+
if (num > maxSampleSize) {
407+
throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +
408+
s"$numStDev * math.sqrt(Int.MaxValue)")
401409
}
402410

403-
if (num > initialCount && !withReplacement) {
404-
total = maxSelected
405-
fraction = multiplier * (maxSelected + 1) / initialCount
406-
} else {
407-
fraction = multiplier * (num + 1) / initialCount
408-
total = num
411+
val rand = new Random(seed)
412+
if (!withReplacement && num >= initialCount) {
413+
return Utils.randomizeInPlace(this.collect(), rand)
409414
}
410415

411-
val rand = new Random(seed)
416+
val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
417+
withReplacement)
418+
412419
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
413420

414421
// If the first sample didn't turn out large enough, keep trying to take samples;
415422
// this shouldn't happen often because we use a big multiplier for the initial size
416-
while (samples.length < total) {
423+
var numIters = 0
424+
while (samples.length < num) {
425+
logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
417426
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
427+
numIters += 1
418428
}
419429

420-
Utils.randomizeInPlace(samples, rand).take(total)
430+
Utils.randomizeInPlace(samples, rand).take(num)
421431
}
422432

423433
/**
@@ -1180,7 +1190,7 @@ abstract class RDD[T: ClassTag](
11801190

11811191
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
11821192
@transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
1183-
private[spark] def getCreationSite: String = creationSiteInfo.toString
1193+
private[spark] def getCreationSite: String = Option(creationSiteInfo).getOrElse("").toString
11841194

11851195
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
11861196

0 commit comments

Comments
 (0)