Skip to content

Commit e570fb5

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-4924
Conflicts: bin/compute-classpath.sh bin/spark-class
2 parents 44cd5f7 + 9d9294a commit e570fb5

File tree

160 files changed

+3692
-2874
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

160 files changed

+3692
-2874
lines changed

bin/run-example

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,32 @@ else
3535
fi
3636

3737
if [ -f "$FWDIR/RELEASE" ]; then
38-
export SPARK_EXAMPLES_JAR="`ls "$FWDIR"/lib/spark-examples-*hadoop*.jar`"
39-
elif [ -e "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar ]; then
40-
export SPARK_EXAMPLES_JAR="`ls "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar`"
38+
JAR_PATH="${FWDIR}/lib"
39+
else
40+
JAR_PATH="${EXAMPLES_DIR}/target/scala-${SPARK_SCALA_VERSION}"
4141
fi
4242

43-
if [[ -z "$SPARK_EXAMPLES_JAR" ]]; then
44-
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
45-
echo "You need to build Spark before running this program" 1>&2
43+
JAR_COUNT=0
44+
45+
for f in ${JAR_PATH}/spark-examples-*hadoop*.jar; do
46+
if [[ ! -e "$f" ]]; then
47+
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
48+
echo "You need to build Spark before running this program" 1>&2
49+
exit 1
50+
fi
51+
SPARK_EXAMPLES_JAR="$f"
52+
JAR_COUNT=$((JAR_COUNT+1))
53+
done
54+
55+
if [ "$JAR_COUNT" -gt "1" ]; then
56+
echo "Found multiple Spark examples assembly jars in ${JAR_PATH}" 1>&2
57+
ls ${JAR_PATH}/spark-examples-*hadoop*.jar 1>&2
58+
echo "Please remove all but one jar." 1>&2
4659
exit 1
4760
fi
4861

62+
export SPARK_EXAMPLES_JAR
63+
4964
EXAMPLE_MASTER=${MASTER:-"local[*]"}
5065

5166
if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark;
19+
20+
import org.apache.spark.scheduler.SparkListener;
21+
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
22+
import org.apache.spark.scheduler.SparkListenerApplicationStart;
23+
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
24+
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
25+
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
26+
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
27+
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
28+
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
29+
import org.apache.spark.scheduler.SparkListenerJobEnd;
30+
import org.apache.spark.scheduler.SparkListenerJobStart;
31+
import org.apache.spark.scheduler.SparkListenerStageCompleted;
32+
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
33+
import org.apache.spark.scheduler.SparkListenerTaskEnd;
34+
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
35+
import org.apache.spark.scheduler.SparkListenerTaskStart;
36+
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
37+
38+
/**
39+
* Java clients should extend this class instead of implementing
40+
* SparkListener directly. This is to prevent java clients
41+
* from breaking when new events are added to the SparkListener
42+
* trait.
43+
*
44+
* This is a concrete class instead of abstract to enforce
45+
* new events get added to both the SparkListener and this adapter
46+
* in lockstep.
47+
*/
48+
public class JavaSparkListener implements SparkListener {
49+
50+
@Override
51+
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
52+
53+
@Override
54+
public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
55+
56+
@Override
57+
public void onTaskStart(SparkListenerTaskStart taskStart) { }
58+
59+
@Override
60+
public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
61+
62+
@Override
63+
public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }
64+
65+
@Override
66+
public void onJobStart(SparkListenerJobStart jobStart) { }
67+
68+
@Override
69+
public void onJobEnd(SparkListenerJobEnd jobEnd) { }
70+
71+
@Override
72+
public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
73+
74+
@Override
75+
public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
76+
77+
@Override
78+
public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
79+
80+
@Override
81+
public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
82+
83+
@Override
84+
public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
85+
86+
@Override
87+
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
88+
89+
@Override
90+
public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
91+
92+
@Override
93+
public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }
94+
95+
@Override
96+
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
97+
}

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
height: 50px;
2020
font-size: 15px;
2121
margin-bottom: 15px;
22+
min-width: 1200px
2223
}
2324

2425
.navbar .navbar-inner {
@@ -39,12 +40,12 @@
3940

4041
.navbar .nav > li a {
4142
height: 30px;
42-
line-height: 30px;
43+
line-height: 2;
4344
}
4445

4546
.navbar-text {
4647
height: 50px;
47-
line-height: 50px;
48+
line-height: 3.3;
4849
}
4950

5051
table.sortable thead {
@@ -170,7 +171,7 @@ span.additional-metric-title {
170171
}
171172

172173
.version {
173-
line-height: 30px;
174+
line-height: 2.5;
174175
vertical-align: bottom;
175176
font-size: 12px;
176177
padding: 0;

core/src/main/scala/org/apache/spark/Aggregator.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ case class Aggregator[K, V, C] (
6161
// Update task metrics if context is not null
6262
// TODO: Make context non optional in a future release
6363
Option(context).foreach { c =>
64-
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
65-
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
64+
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
65+
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
6666
}
6767
combiners.iterator
6868
}
@@ -95,8 +95,8 @@ case class Aggregator[K, V, C] (
9595
// Update task metrics if context is not null
9696
// TODO: Make context non-optional in a future release
9797
Option(context).foreach { c =>
98-
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
99-
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
98+
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
99+
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
100100
}
101101
combiners.iterator
102102
}

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4444
blockManager.get(key) match {
4545
case Some(blockResult) =>
4646
// Partition is already materialized, so just return its values
47-
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
47+
val inputMetrics = blockResult.inputMetrics
48+
val existingMetrics = context.taskMetrics
49+
.getInputMetricsForReadMethod(inputMetrics.readMethod)
50+
existingMetrics.addBytesRead(inputMetrics.bytesRead)
51+
4852
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
4953

5054
case None =>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -520,10 +520,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
520520

521521
/** Distribute a local Scala collection to form an RDD.
522522
*
523-
* @note Parallelize acts lazily. If `seq` is a mutable collection and is
524-
* altered after the call to parallelize and before the first action on the
525-
* RDD, the resultant RDD will reflect the modified collection. Pass a copy of
526-
* the argument to avoid this.
523+
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
524+
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
525+
* modified collection. Pass a copy of the argument to avoid this.
527526
*/
528527
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
529528
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ import org.apache.spark.rdd.RDD
3838
import org.apache.spark.storage.StorageLevel
3939
import org.apache.spark.util.Utils
4040

41+
/**
42+
* Defines operations common to several Java RDD implementations.
43+
* Note that this trait is not intended to be implemented by user code.
44+
*/
4145
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
4246
def wrapRDD(rdd: RDD[T]): This
4347

@@ -435,6 +439,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
435439
*/
436440
def first(): T = rdd.first()
437441

442+
/**
443+
* @return true if and only if the RDD contains no elements at all. Note that an RDD
444+
* may be empty even when it has at least 1 partition.
445+
*/
446+
def isEmpty(): Boolean = rdd.isEmpty()
447+
438448
/**
439449
* Save this RDD as a text file, using string representations of elements.
440450
*/

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ private[spark] class PythonRDD(
125125
init, finish))
126126
val memoryBytesSpilled = stream.readLong()
127127
val diskBytesSpilled = stream.readLong()
128-
context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
129-
context.taskMetrics.diskBytesSpilled += diskBytesSpilled
128+
context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
129+
context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
130130
read()
131131
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
132132
// Signals that an exception has been thrown in python

core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer
2323

2424
import org.apache.log4j.Level
2525

26-
import org.apache.spark.util.MemoryParam
26+
import org.apache.spark.util.{IntParam, MemoryParam}
2727

2828
/**
2929
* Command-line parser for the driver client.
@@ -51,8 +51,8 @@ private[spark] class ClientArguments(args: Array[String]) {
5151
parse(args.toList)
5252

5353
def parse(args: List[String]): Unit = args match {
54-
case ("--cores" | "-c") :: value :: tail =>
55-
cores = value.toInt
54+
case ("--cores" | "-c") :: IntParam(value) :: tail =>
55+
cores = value
5656
parse(tail)
5757

5858
case ("--memory" | "-m") :: MemoryParam(value) :: tail =>

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ object SparkSubmit {
200200
// Yarn cluster only
201201
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
202202
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
203+
OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
203204
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
204205
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
205206
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),

0 commit comments

Comments
 (0)