Skip to content

Commit a4c387b

Browse files
committed
Merge branch 'master' of github.com:apache/spark into unroll-them-partitions
2 parents cf5f565 + f14b00a commit a4c387b

File tree

131 files changed

+3161
-591
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

131 files changed

+3161
-591
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ spark-env.sh.template
2222
log4j-defaults.properties
2323
sorttable.js
2424
.*txt
25+
.*json
2526
.*data
2627
.*log
2728
cloudpickle.py

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,24 @@ span.kill-link {
8787
span.kill-link a {
8888
color: gray;
8989
}
90+
91+
span.expand-details {
92+
font-size: 10pt;
93+
cursor: pointer;
94+
color: grey;
95+
float: right;
96+
}
97+
98+
.stage-details {
99+
max-height: 100px;
100+
overflow-y: auto;
101+
margin: 0;
102+
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
103+
}
104+
105+
.stage-details.collapsed {
106+
max-height: 0;
107+
padding-top: 0;
108+
padding-bottom: 0;
109+
border: none;
110+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
4949
import org.apache.spark.scheduler.local.LocalBackend
5050
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
5151
import org.apache.spark.ui.SparkUI
52-
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
52+
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
5353

5454
/**
5555
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -224,7 +224,6 @@ class SparkContext(config: SparkConf) extends Logging {
224224

225225
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
226226
val hadoopConfiguration: Configuration = {
227-
val env = SparkEnv.get
228227
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
229228
// Explicitly check for S3 environment variables
230229
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
@@ -1036,9 +1035,11 @@ class SparkContext(config: SparkConf) extends Logging {
10361035
* Capture the current user callsite and return a formatted version for printing. If the user
10371036
* has overridden the call site, this will return the user's version.
10381037
*/
1039-
private[spark] def getCallSite(): String = {
1040-
val defaultCallSite = Utils.getCallSiteInfo
1041-
Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
1038+
private[spark] def getCallSite(): CallSite = {
1039+
Option(getLocalProperty("externalCallSite")) match {
1040+
case Some(callSite) => CallSite(callSite, long = "")
1041+
case None => Utils.getCallSite
1042+
}
10421043
}
10431044

10441045
/**
@@ -1058,11 +1059,11 @@ class SparkContext(config: SparkConf) extends Logging {
10581059
}
10591060
val callSite = getCallSite
10601061
val cleanedFunc = clean(func)
1061-
logInfo("Starting job: " + callSite)
1062+
logInfo("Starting job: " + callSite.short)
10621063
val start = System.nanoTime
10631064
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
10641065
resultHandler, localProperties.get)
1065-
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
1066+
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
10661067
rdd.doCheckpoint()
10671068
}
10681069

@@ -1143,11 +1144,11 @@ class SparkContext(config: SparkConf) extends Logging {
11431144
evaluator: ApproximateEvaluator[U, R],
11441145
timeout: Long): PartialResult[R] = {
11451146
val callSite = getCallSite
1146-
logInfo("Starting job: " + callSite)
1147+
logInfo("Starting job: " + callSite.short)
11471148
val start = System.nanoTime
11481149
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
11491150
localProperties.get)
1150-
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
1151+
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
11511152
result
11521153
}
11531154

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.api.java
1919

20+
import java.util.Comparator
21+
2022
import scala.language.implicitConversions
2123
import scala.reflect.ClassTag
2224

2325
import org.apache.spark._
26+
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
2427
import org.apache.spark.api.java.function.{Function => JFunction}
2528
import org.apache.spark.rdd.RDD
2629
import org.apache.spark.storage.StorageLevel
@@ -172,6 +175,19 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
172175
rdd.setName(name)
173176
this
174177
}
178+
179+
/**
180+
* Return this RDD sorted by the given key function.
181+
*/
182+
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
183+
import scala.collection.JavaConverters._
184+
def fn = (x: T) => f.call(x)
185+
import com.google.common.collect.Ordering // shadows scala.math.Ordering
186+
implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
187+
implicit val ctag: ClassTag[S] = fakeClassTag
188+
wrapRDD(rdd.sortBy(fn, ascending, numPartitions))
189+
}
190+
175191
}
176192

177193
object JavaRDD {

core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import scala.collection.mutable.ListBuffer
2121

2222
import org.apache.log4j.Level
2323

24+
import org.apache.spark.util.MemoryParam
25+
2426
/**
2527
* Command-line parser for the driver client.
2628
*/
@@ -51,8 +53,8 @@ private[spark] class ClientArguments(args: Array[String]) {
5153
cores = value.toInt
5254
parse(tail)
5355

54-
case ("--memory" | "-m") :: value :: tail =>
55-
memory = value.toInt
56+
case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
57+
memory = value
5658
parse(tail)
5759

5860
case ("--supervise" | "-s") :: tail =>

core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.deploy
1919

2020
private[spark] object ExecutorState extends Enumeration {
2121

22-
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
22+
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
2323

2424
type ExecutorState = Value
2525

26-
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
26+
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
2727
}

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
2020
import java.util.Date
2121

2222
import scala.collection.mutable
23+
import scala.collection.mutable.ArrayBuffer
2324

2425
import akka.actor.ActorRef
2526

@@ -36,6 +37,7 @@ private[spark] class ApplicationInfo(
3637

3738
@transient var state: ApplicationState.Value = _
3839
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
40+
@transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
3941
@transient var coresGranted: Int = _
4042
@transient var endTime: Long = _
4143
@transient var appSource: ApplicationSource = _
@@ -51,6 +53,7 @@ private[spark] class ApplicationInfo(
5153
endTime = -1L
5254
appSource = new ApplicationSource(this)
5355
nextExecutorId = 0
56+
removedExecutors = new ArrayBuffer[ExecutorInfo]
5457
}
5558

5659
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -74,6 +77,7 @@ private[spark] class ApplicationInfo(
7477

7578
def removeExecutor(exec: ExecutorInfo) {
7679
if (executors.contains(exec.id)) {
80+
removedExecutors += executors(exec.id)
7781
executors -= exec.id
7882
coresGranted -= exec.cores
7983
}

core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,19 @@ private[spark] class ExecutorInfo(
3434
}
3535

3636
def fullId: String = application.id + "/" + id
37+
38+
override def equals(other: Any): Boolean = {
39+
other match {
40+
case info: ExecutorInfo =>
41+
fullId == info.fullId &&
42+
worker.id == info.worker.id &&
43+
cores == info.cores &&
44+
memory == info.memory
45+
case _ => false
46+
}
47+
}
48+
49+
override def toString: String = fullId
50+
51+
override def hashCode: Int = toString.hashCode()
3752
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,11 @@ private[spark] class Master(
303303
appInfo.removeExecutor(exec)
304304
exec.worker.removeExecutor(exec)
305305

306+
val normalExit = exitStatus.exists(_ == 0)
306307
// Only retry certain number of times so we don't go into an infinite loop.
307-
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308+
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308309
schedule()
309-
} else {
310+
} else if (!normalExit) {
310311
logError("Application %s with ID %s failed %d times, removing it".format(
311312
appInfo.desc.name, appInfo.id, appInfo.retryCount))
312313
removeApplication(appInfo, ApplicationState.FAILED)

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.xml.Node
2525
import akka.pattern.ask
2626
import org.json4s.JValue
2727

28-
import org.apache.spark.deploy.JsonProtocol
28+
import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
2929
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
3030
import org.apache.spark.deploy.master.ExecutorInfo
3131
import org.apache.spark.ui.{WebUIPage, UIUtils}
@@ -57,43 +57,55 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
5757
})
5858

5959
val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
60-
val executors = app.executors.values.toSeq
61-
val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
60+
val allExecutors = (app.executors.values ++ app.removedExecutors).toSet.toSeq
61+
// This includes executors that are either still running or have exited cleanly
62+
val executors = allExecutors.filter { exec =>
63+
!ExecutorState.isFinished(exec.state) || exec.state == ExecutorState.EXITED
64+
}
65+
val removedExecutors = allExecutors.diff(executors)
66+
val executorsTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
67+
val removedExecutorsTable = UIUtils.listingTable(executorHeaders, executorRow, removedExecutors)
6268

6369
val content =
64-
<div class="row-fluid">
65-
<div class="span12">
66-
<ul class="unstyled">
67-
<li><strong>ID:</strong> {app.id}</li>
68-
<li><strong>Name:</strong> {app.desc.name}</li>
69-
<li><strong>User:</strong> {app.desc.user}</li>
70-
<li><strong>Cores:</strong>
71-
{
72-
if (app.desc.maxCores.isEmpty) {
73-
"Unlimited (%s granted)".format(app.coresGranted)
74-
} else {
75-
"%s (%s granted, %s left)".format(
76-
app.desc.maxCores.get, app.coresGranted, app.coresLeft)
77-
}
78-
}
79-
</li>
80-
<li>
81-
<strong>Executor Memory:</strong>
82-
{Utils.megabytesToString(app.desc.memoryPerSlave)}
83-
</li>
84-
<li><strong>Submit Date:</strong> {app.submitDate}</li>
85-
<li><strong>State:</strong> {app.state}</li>
86-
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
87-
</ul>
88-
</div>
70+
<div class="row-fluid">
71+
<div class="span12">
72+
<ul class="unstyled">
73+
<li><strong>ID:</strong> {app.id}</li>
74+
<li><strong>Name:</strong> {app.desc.name}</li>
75+
<li><strong>User:</strong> {app.desc.user}</li>
76+
<li><strong>Cores:</strong>
77+
{
78+
if (app.desc.maxCores.isEmpty) {
79+
"Unlimited (%s granted)".format(app.coresGranted)
80+
} else {
81+
"%s (%s granted, %s left)".format(
82+
app.desc.maxCores.get, app.coresGranted, app.coresLeft)
83+
}
84+
}
85+
</li>
86+
<li>
87+
<strong>Executor Memory:</strong>
88+
{Utils.megabytesToString(app.desc.memoryPerSlave)}
89+
</li>
90+
<li><strong>Submit Date:</strong> {app.submitDate}</li>
91+
<li><strong>State:</strong> {app.state}</li>
92+
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
93+
</ul>
8994
</div>
95+
</div>
9096

91-
<div class="row-fluid"> <!-- Executors -->
92-
<div class="span12">
93-
<h4> Executor Summary </h4>
94-
{executorTable}
95-
</div>
96-
</div>;
97+
<div class="row-fluid"> <!-- Executors -->
98+
<div class="span12">
99+
<h4> Executor Summary </h4>
100+
{executorsTable}
101+
{
102+
if (removedExecutors.nonEmpty) {
103+
<h4> Removed Executors </h4> ++
104+
removedExecutorsTable
105+
}
106+
}
107+
</div>
108+
</div>;
97109
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
98110
}
99111

0 commit comments

Comments
 (0)