Skip to content

Commit 2beeedb

Browse files
committed
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Conflicts: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala python/pyspark/context.py
2 parents 795a763 + 3a9d82c commit 2beeedb

File tree

138 files changed

+3248
-1168
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

138 files changed

+3248
-1168
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@ derby.log
4444
dist/
4545
spark-*-bin.tar.gz
4646
unit-tests.log
47-
lib/
47+
/lib/

assembly/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@
7171
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
7272
<version>${project.version}</version>
7373
</dependency>
74+
<dependency>
75+
<groupId>org.apache.spark</groupId>
76+
<artifactId>spark-graphx_${scala.binary.version}</artifactId>
77+
<version>${project.version}</version>
78+
</dependency>
7479
<dependency>
7580
<groupId>net.sf.py4j</groupId>
7681
<artifactId>py4j</artifactId>

bin/run-example

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,20 @@ else
7676
fi
7777
fi
7878

79+
# Set JAVA_OPTS to be able to load native libraries and to set heap size
80+
JAVA_OPTS="$SPARK_JAVA_OPTS"
81+
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
82+
# Load extra JAVA_OPTS from conf/java-opts, if it exists
83+
if [ -e "$FWDIR/conf/java-opts" ] ; then
84+
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
85+
fi
86+
export JAVA_OPTS
87+
7988
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
8089
echo -n "Spark Command: "
81-
echo "$RUNNER" -cp "$CLASSPATH" "$@"
90+
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
8291
echo "========================================"
8392
echo
8493
fi
8594

86-
exec "$RUNNER" -cp "$CLASSPATH" "$@"
95+
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"

bin/run-example2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ if "x%SPARK_EXAMPLES_JAR%"=="x" (
4949

5050
rem Compute Spark classpath using external script
5151
set DONT_PRINT_CLASSPATH=1
52-
call "%FWDIR%sbin\compute-classpath.cmd"
52+
call "%FWDIR%bin\compute-classpath.cmd"
5353
set DONT_PRINT_CLASSPATH=0
5454
set CLASSPATH=%SPARK_EXAMPLES_JAR%;%CLASSPATH%
5555

bin/spark-shell

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,18 @@ for o in "$@"; do
4545
done
4646

4747
# Set MASTER from spark-env if possible
48+
DEFAULT_SPARK_MASTER_PORT=7077
4849
if [ -z "$MASTER" ]; then
4950
if [ -e "$FWDIR/conf/spark-env.sh" ]; then
5051
. "$FWDIR/conf/spark-env.sh"
5152
fi
52-
if [[ "x" != "x$SPARK_MASTER_IP" && "y" != "y$SPARK_MASTER_PORT" ]]; then
53-
MASTER="spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}"
54-
export MASTER
53+
if [ "x" != "x$SPARK_MASTER_IP" ]; then
54+
if [ "y" != "y$SPARK_MASTER_PORT" ]; then
55+
SPARK_MASTER_PORT="${SPARK_MASTER_PORT}"
56+
else
57+
SPARK_MASTER_PORT=$DEFAULT_SPARK_MASTER_PORT
58+
fi
59+
export MASTER="spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}"
5560
fi
5661
fi
5762

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ object Partitioner {
4141
* spark.default.parallelism is set, then we'll use the value from SparkContext
4242
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
4343
*
44-
* Unless spark.default.parallelism is set, He number of partitions will be the
44+
* Unless spark.default.parallelism is set, the number of partitions will be the
4545
* same as the number of partitions in the largest upstream RDD, as this should
4646
* be least likely to cause out-of-memory errors.
4747
*
4848
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
4949
*/
5050
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
5151
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
52-
for (r <- bySize if r.partitioner != None) {
52+
for (r <- bySize if r.partitioner.isDefined) {
5353
return r.partitioner.get
5454
}
5555
if (rdd.context.conf.contains("spark.default.parallelism")) {

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
192192
}
193193

194194
/** Get all akka conf variables set on this SparkConf */
195-
def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")}
195+
def getAkkaConf: Seq[(String, String)] =
196+
/* This is currently undocumented. If we want to make this public we should consider
197+
* nesting options under the spark namespace to avoid conflicts with user akka options.
198+
* Otherwise users configuring their own akka code via system properties could mess up
199+
* spark's akka options.
200+
*
201+
* E.g. spark.akka.option.x.y.x = "value"
202+
*/
203+
getAll.filter {case (k, v) => k.startsWith("akka.")}
196204

197205
/** Does the configuration contain a given parameter? */
198206
def contains(key: String): Boolean = settings.contains(key)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 81 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ class SparkContext(
341341
*/
342342
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
343343
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
344-
minSplits, cloneRecords = false).map(pair => pair._2.toString)
344+
minSplits).map(pair => pair._2.toString)
345345
}
346346

347347
/**
@@ -354,33 +354,37 @@ class SparkContext(
354354
* @param keyClass Class of the keys
355355
* @param valueClass Class of the values
356356
* @param minSplits Minimum number of Hadoop Splits to generate.
357-
* @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
358-
* Most RecordReader implementations reuse wrapper objects across multiple
359-
* records, and can cause problems in RDD collect or aggregation operations.
360-
* By default the records are cloned in Spark. However, application
361-
* programmers can explicitly disable the cloning for better performance.
357+
*
358+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
359+
* record, directly caching the returned RDD will create many references to the same object.
360+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
361+
* a `map` function.
362362
*/
363-
def hadoopRDD[K: ClassTag, V: ClassTag](
363+
def hadoopRDD[K, V](
364364
conf: JobConf,
365365
inputFormatClass: Class[_ <: InputFormat[K, V]],
366366
keyClass: Class[K],
367367
valueClass: Class[V],
368-
minSplits: Int = defaultMinSplits,
369-
cloneRecords: Boolean = true
368+
minSplits: Int = defaultMinSplits
370369
): RDD[(K, V)] = {
371370
// Add necessary security credentials to the JobConf before broadcasting it.
372371
SparkHadoopUtil.get.addCredentials(conf)
373-
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
372+
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
374373
}
375374

376-
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
377-
def hadoopFile[K: ClassTag, V: ClassTag](
375+
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
376+
*
377+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
378+
* record, directly caching the returned RDD will create many references to the same object.
379+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
380+
* a `map` function.
381+
* */
382+
def hadoopFile[K, V](
378383
path: String,
379384
inputFormatClass: Class[_ <: InputFormat[K, V]],
380385
keyClass: Class[K],
381386
valueClass: Class[V],
382-
minSplits: Int = defaultMinSplits,
383-
cloneRecords: Boolean = true
387+
minSplits: Int = defaultMinSplits
384388
): RDD[(K, V)] = {
385389
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
386390
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -392,8 +396,7 @@ class SparkContext(
392396
inputFormatClass,
393397
keyClass,
394398
valueClass,
395-
minSplits,
396-
cloneRecords)
399+
minSplits)
397400
}
398401

399402
/**
@@ -403,16 +406,20 @@ class SparkContext(
403406
* {{{
404407
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
405408
* }}}
409+
*
410+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
411+
* record, directly caching the returned RDD will create many references to the same object.
412+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
413+
* a `map` function.
406414
*/
407415
def hadoopFile[K, V, F <: InputFormat[K, V]]
408-
(path: String, minSplits: Int, cloneRecords: Boolean = true)
416+
(path: String, minSplits: Int)
409417
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
410418
hadoopFile(path,
411419
fm.runtimeClass.asInstanceOf[Class[F]],
412420
km.runtimeClass.asInstanceOf[Class[K]],
413421
vm.runtimeClass.asInstanceOf[Class[V]],
414-
minSplits,
415-
cloneRecords)
422+
minSplits)
416423
}
417424

418425
/**
@@ -422,68 +429,91 @@ class SparkContext(
422429
* {{{
423430
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
424431
* }}}
432+
*
433+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
434+
* record, directly caching the returned RDD will create many references to the same object.
435+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
436+
* a `map` function.
425437
*/
426-
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
438+
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
427439
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
428-
hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
440+
hadoopFile[K, V, F](path, defaultMinSplits)
429441

430442
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
431443
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
432-
(path: String, cloneRecords: Boolean = true)
444+
(path: String)
433445
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
434446
newAPIHadoopFile(
435447
path,
436448
fm.runtimeClass.asInstanceOf[Class[F]],
437449
km.runtimeClass.asInstanceOf[Class[K]],
438-
vm.runtimeClass.asInstanceOf[Class[V]],
439-
cloneRecords = cloneRecords)
450+
vm.runtimeClass.asInstanceOf[Class[V]])
440451
}
441452

442453
/**
443454
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
444455
* and extra configuration options to pass to the input format.
456+
*
457+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
458+
* record, directly caching the returned RDD will create many references to the same object.
459+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
460+
* a `map` function.
445461
*/
446-
def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
462+
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
447463
path: String,
448464
fClass: Class[F],
449465
kClass: Class[K],
450466
vClass: Class[V],
451-
conf: Configuration = hadoopConfiguration,
452-
cloneRecords: Boolean = true): RDD[(K, V)] = {
467+
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
453468
val job = new NewHadoopJob(conf)
454469
NewFileInputFormat.addInputPath(job, new Path(path))
455470
val updatedConf = job.getConfiguration
456-
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
471+
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
457472
}
458473

459474
/**
460475
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
461476
* and extra configuration options to pass to the input format.
477+
*
478+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
479+
* record, directly caching the returned RDD will create many references to the same object.
480+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
481+
* a `map` function.
462482
*/
463-
def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
483+
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
464484
conf: Configuration = hadoopConfiguration,
465485
fClass: Class[F],
466486
kClass: Class[K],
467-
vClass: Class[V],
468-
cloneRecords: Boolean = true): RDD[(K, V)] = {
469-
new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
470-
}
471-
472-
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
473-
def sequenceFile[K: ClassTag, V: ClassTag](path: String,
487+
vClass: Class[V]): RDD[(K, V)] = {
488+
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
489+
}
490+
491+
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
492+
*
493+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
494+
* record, directly caching the returned RDD will create many references to the same object.
495+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
496+
* a `map` function.
497+
*/
498+
def sequenceFile[K, V](path: String,
474499
keyClass: Class[K],
475500
valueClass: Class[V],
476-
minSplits: Int,
477-
cloneRecords: Boolean = true
501+
minSplits: Int
478502
): RDD[(K, V)] = {
479503
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
480-
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
504+
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
481505
}
482506

483-
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
484-
def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
485-
cloneRecords: Boolean = true): RDD[(K, V)] =
486-
sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
507+
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
508+
*
509+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
510+
* record, directly caching the returned RDD will create many references to the same object.
511+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
512+
* a `map` function.
513+
* */
514+
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
515+
): RDD[(K, V)] =
516+
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
487517

488518
/**
489519
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -500,9 +530,14 @@ class SparkContext(
500530
* have a parameterized singleton object). We use functions instead to create a new converter
501531
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
502532
* allow it to figure out the Writable class to use in the subclass case.
533+
*
534+
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
535+
* record, directly caching the returned RDD will create many references to the same object.
536+
* If you plan to directly cache Hadoop writable objects, you should first copy them using
537+
* a `map` function.
503538
*/
504539
def sequenceFile[K, V]
505-
(path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
540+
(path: String, minSplits: Int = defaultMinSplits)
506541
(implicit km: ClassTag[K], vm: ClassTag[V],
507542
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
508543
: RDD[(K, V)] = {
@@ -511,7 +546,7 @@ class SparkContext(
511546
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
512547
val writables = hadoopFile(path, format,
513548
kc.writableClass(km).asInstanceOf[Class[Writable]],
514-
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
549+
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
515550
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
516551
}
517552

@@ -1024,7 +1059,7 @@ object SparkContext {
10241059
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
10251060

10261061
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
1027-
rdd: RDD[(K, V)]) =
1062+
rdd: RDD[(K, V)]) =
10281063
new SequenceFileRDDFunctions(rdd)
10291064

10301065
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,16 @@ private[spark] case class ExceptionFailure(
5353
private[spark] case object TaskResultLost extends TaskEndReason
5454

5555
private[spark] case object TaskKilled extends TaskEndReason
56+
57+
/**
58+
* The task failed because the executor that it was running on was lost. This may happen because
59+
* the task crashed the JVM.
60+
*/
61+
private[spark] case object ExecutorLostFailure extends TaskEndReason
62+
63+
/**
64+
* We don't know why the task ended -- for example, because of a ClassNotFound exception when
65+
* deserializing the task result.
66+
*/
67+
private[spark] case object UnknownReason extends TaskEndReason
68+

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
4949

5050
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
5151

52-
override val classTag: ClassTag[(K, V)] =
53-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
52+
override val classTag: ClassTag[(K, V)] = rdd.elementClassTag
5453

5554
import JavaPairRDD._
5655

0 commit comments

Comments
 (0)