Skip to content

Commit 4faf5f9

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into gen_defer
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
2 parents f6139e6 + 9cdd867 commit 4faf5f9

File tree

106 files changed

+3180
-1227
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

106 files changed

+3180
-1227
lines changed

core/src/main/resources/org/apache/spark/ui/static/historypage-template.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
<tbody>
6565
{{#applications}}
6666
<tr>
67-
<td class="rowGroupColumn"><a href="/history/{{id}}/{{num}}/jobs/">{{id}}</a></td>
67+
<td class="rowGroupColumn"><span title="{{id}}"><a href="/history/{{id}}/{{num}}/jobs/">{{id}}</a></span></td>
6868
<td class="rowGroupColumn">{{name}}</td>
6969
{{#attempts}}
7070
<td class="attemptIDSpan"><a href="/history/{{id}}/{{attemptId}}/jobs/">{{attemptId}}</a></td>

core/src/main/resources/org/apache/spark/ui/static/historypage.js

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,22 @@ function formatDuration(milliseconds) {
3737
return hours.toFixed(1) + " h";
3838
}
3939

40+
function makeIdNumeric(id) {
41+
var strs = id.split("_");
42+
if (strs.length < 3) {
43+
return id;
44+
}
45+
var appSeqNum = strs[2];
46+
var resl = strs[0] + "_" + strs[1] + "_";
47+
var diff = 10 - appSeqNum.length;
48+
while (diff > 0) {
49+
resl += "0"; // padding 0 before the app sequence number to make sure it has 10 characters
50+
diff--;
51+
}
52+
resl += appSeqNum;
53+
return resl;
54+
}
55+
4056
function formatDate(date) {
4157
return date.split(".")[0].replace("T", " ");
4258
}
@@ -62,6 +78,21 @@ jQuery.extend( jQuery.fn.dataTableExt.oSort, {
6278
}
6379
} );
6480

81+
jQuery.extend( jQuery.fn.dataTableExt.oSort, {
82+
"appid-numeric-pre": function ( a ) {
83+
var x = a.match(/title="*(-?[0-9a-zA-Z\-\_]+)/)[1];
84+
return makeIdNumeric(x);
85+
},
86+
87+
"appid-numeric-asc": function ( a, b ) {
88+
return ((a < b) ? -1 : ((a > b) ? 1 : 0));
89+
},
90+
91+
"appid-numeric-desc": function ( a, b ) {
92+
return ((a < b) ? 1 : ((a > b) ? -1 : 0));
93+
}
94+
} );
95+
6596
$(document).ajaxStop($.unblockUI);
6697
$(document).ajaxStart(function(){
6798
$.blockUI({ message: '<h3>Loading history summary...</h3>'});
@@ -109,7 +140,7 @@ $(document).ready(function() {
109140
var selector = "#history-summary-table";
110141
var conf = {
111142
"columns": [
112-
{name: 'first'},
143+
{name: 'first', type: "appid-numeric"},
113144
{name: 'second'},
114145
{name: 'third'},
115146
{name: 'fourth'},

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,31 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
503503
set("spark.executor.instances", value)
504504
}
505505
}
506+
507+
if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
508+
val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
509+
"instead use \"yarn\" with specified deploy mode."
510+
511+
get("spark.master") match {
512+
case "yarn-cluster" =>
513+
logWarning(warning)
514+
set("spark.master", "yarn")
515+
set("spark.submit.deployMode", "cluster")
516+
case "yarn-client" =>
517+
logWarning(warning)
518+
set("spark.master", "yarn")
519+
set("spark.submit.deployMode", "client")
520+
case _ => // Any other unexpected master will be checked when creating scheduler backend.
521+
}
522+
}
523+
524+
if (contains("spark.submit.deployMode")) {
525+
get("spark.submit.deployMode") match {
526+
case "cluster" | "client" =>
527+
case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
528+
"\"client\".")
529+
}
530+
}
506531
}
507532

508533
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
237237
def jars: Seq[String] = _jars
238238
def files: Seq[String] = _files
239239
def master: String = _conf.get("spark.master")
240+
def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
240241
def appName: String = _conf.get("spark.app.name")
241242

242243
private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
@@ -297,9 +298,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
297298
val sparkUser = Utils.getCurrentUserName()
298299

299300
private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend
300-
private[spark] def schedulerBackend_=(sb: SchedulerBackend): Unit = {
301-
_schedulerBackend = sb
302-
}
303301

304302
private[spark] def taskScheduler: TaskScheduler = _taskScheduler
305303
private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
@@ -322,8 +320,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
322320
def applicationId: String = _applicationId
323321
def applicationAttemptId: Option[String] = _applicationAttemptId
324322

325-
def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null
326-
327323
private[spark] def eventLogger: Option[EventLoggingListener] = _eventLogger
328324

329325
private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =
@@ -380,10 +376,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
380376
}
381377

382378
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
383-
// yarn-standalone is deprecated, but still supported
384-
if ((master == "yarn-cluster" || master == "yarn-standalone") &&
385-
!_conf.contains("spark.yarn.app.id")) {
386-
throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
379+
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
380+
throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
387381
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
388382
}
389383

@@ -419,7 +413,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
419413
}
420414
}
421415

422-
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
416+
if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")
423417

424418
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
425419
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
@@ -496,7 +490,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
496490
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
497491

498492
// Create and start the scheduler
499-
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
493+
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
500494
_schedulerBackend = sched
501495
_taskScheduler = ts
502496
_dagScheduler = new DAGScheduler(this)
@@ -514,9 +508,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
514508

515509
// The metrics system for Driver need to be set spark.app.id to app ID.
516510
// So it should start after we get app ID from the task scheduler and set spark.app.id.
517-
metricsSystem.start()
511+
_env.metricsSystem.start()
518512
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
519-
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
513+
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
520514

521515
_eventLogger =
522516
if (isEventLogEnabled) {
@@ -1595,10 +1589,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15951589
key = uri.getScheme match {
15961590
// A JAR file which exists only on the driver node
15971591
case null | "file" =>
1598-
// yarn-standalone is deprecated, but still supported
1599-
if (SparkHadoopUtil.get.isYarnMode() &&
1600-
(master == "yarn-standalone" || master == "yarn-cluster")) {
1601-
// In order for this to work in yarn-cluster mode the user must specify the
1592+
if (master == "yarn" && deployMode == "cluster") {
1593+
// In order for this to work in yarn cluster mode the user must specify the
16021594
// --addJars option to the client to upload the file into the distributed cache
16031595
// of the AM to make it show up in the current working directory.
16041596
val fileName = new Path(uri.getPath).getName()
@@ -2324,7 +2316,8 @@ object SparkContext extends Logging {
23242316
*/
23252317
private def createTaskScheduler(
23262318
sc: SparkContext,
2327-
master: String): (SchedulerBackend, TaskScheduler) = {
2319+
master: String,
2320+
deployMode: String): (SchedulerBackend, TaskScheduler) = {
23282321
import SparkMasterRegex._
23292322

23302323
// When running locally, don't try to re-execute tasks on failure.
@@ -2386,11 +2379,7 @@ object SparkContext extends Logging {
23862379
}
23872380
(backend, scheduler)
23882381

2389-
case "yarn-standalone" | "yarn-cluster" =>
2390-
if (master == "yarn-standalone") {
2391-
logWarning(
2392-
"\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
2393-
}
2382+
case "yarn" if deployMode == "cluster" =>
23942383
val scheduler = try {
23952384
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
23962385
val cons = clazz.getConstructor(classOf[SparkContext])
@@ -2415,7 +2404,7 @@ object SparkContext extends Logging {
24152404
scheduler.initialize(backend)
24162405
(backend, scheduler)
24172406

2418-
case "yarn-client" =>
2407+
case "yarn" if deployMode == "client" =>
24192408
val scheduler = try {
24202409
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
24212410
val cons = clazz.getConstructor(classOf[SparkContext])
@@ -2456,7 +2445,7 @@ object SparkContext extends Logging {
24562445
case zkUrl if zkUrl.startsWith("zk://") =>
24572446
logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
24582447
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
2459-
createTaskScheduler(sc, "mesos://" + zkUrl)
2448+
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
24602449

24612450
case _ =>
24622451
throw new SparkException("Could not parse Master URL: '" + master + "'")

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,17 @@ object SparkSubmit {
226226

227227
// Set the cluster manager
228228
val clusterManager: Int = args.master match {
229-
case m if m.startsWith("yarn") => YARN
229+
case "yarn" => YARN
230+
case "yarn-client" | "yarn-cluster" =>
231+
printWarning(s"Master ${args.master} is deprecated since 2.0." +
232+
" Please use master \"yarn\" with specified deploy mode instead.")
233+
YARN
230234
case m if m.startsWith("spark") => STANDALONE
231235
case m if m.startsWith("mesos") => MESOS
232236
case m if m.startsWith("local") => LOCAL
233-
case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1
237+
case _ =>
238+
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
239+
-1
234240
}
235241

236242
// Set the deploy mode; default is client mode
@@ -240,23 +246,20 @@ object SparkSubmit {
240246
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
241247
}
242248

243-
// Because "yarn-cluster" and "yarn-client" encapsulate both the master
244-
// and deploy mode, we have some logic to infer the master and deploy mode
249+
// Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
250+
// the master and deploy mode, we have some logic to infer the master and deploy mode
245251
// from each other if only one is specified, or exit early if they are at odds.
246252
if (clusterManager == YARN) {
247-
if (args.master == "yarn-standalone") {
248-
printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.")
249-
args.master = "yarn-cluster"
250-
}
251253
(args.master, args.deployMode) match {
252254
case ("yarn-cluster", null) =>
253255
deployMode = CLUSTER
256+
args.master = "yarn"
254257
case ("yarn-cluster", "client") =>
255258
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
256259
case ("yarn-client", "cluster") =>
257260
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
258261
case (_, mode) =>
259-
args.master = "yarn-" + Option(mode).getOrElse("client")
262+
args.master = "yarn"
260263
}
261264

262265
// Make sure YARN is included in our build if we're trying to use it

core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private[v1] class AllRDDResource(ui: SparkUI) {
2828

2929
@GET
3030
def rddList(): Seq[RDDStorageInfo] = {
31-
val storageStatusList = ui.storageListener.storageStatusList
31+
val storageStatusList = ui.storageListener.activeStorageStatusList
3232
val rddInfos = ui.storageListener.rddInfoList
3333
rddInfos.map{rddInfo =>
3434
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
@@ -44,7 +44,7 @@ private[spark] object AllRDDResource {
4444
rddId: Int,
4545
listener: StorageListener,
4646
includeDetails: Boolean): Option[RDDStorageInfo] = {
47-
val storageStatusList = listener.storageStatusList
47+
val storageStatusList = listener.activeStorageStatusList
4848
listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
4949
getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
5050
}

core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ private[v1] class ExecutorListResource(ui: SparkUI) {
3131
listener.synchronized {
3232
// The follow codes should be protected by `listener` to make sure no executors will be
3333
// removed before we query their status. See SPARK-12784.
34-
val storageStatusList = listener.storageStatusList
34+
val storageStatusList = listener.activeStorageStatusList
3535
(0 until storageStatusList.size).map { statusId =>
36-
ExecutorsPage.getExecInfo(listener, statusId)
36+
ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
3737
}
3838
}
3939
}

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class ExecutorStageSummary private[spark](
5454
class ExecutorSummary private[spark](
5555
val id: String,
5656
val hostPort: String,
57+
val isActive: Boolean,
5758
val rddBlocks: Int,
5859
val memoryUsed: Long,
5960
val diskUsed: Long,

core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ private[spark] class DiskBlockObjectWriter(
203203
numRecordsWritten += 1
204204
writeMetrics.incRecordsWritten(1)
205205

206+
// TODO: call updateBytesWritten() less frequently.
206207
if (numRecordsWritten % 32 == 0) {
207208
updateBytesWritten()
208209
}

core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.storage
1919

2020
import scala.collection.mutable
2121

22+
import org.apache.spark.SparkConf
2223
import org.apache.spark.annotation.DeveloperApi
2324
import org.apache.spark.scheduler._
2425

@@ -29,14 +30,20 @@ import org.apache.spark.scheduler._
2930
* This class is thread-safe (unlike JobProgressListener)
3031
*/
3132
@DeveloperApi
32-
class StorageStatusListener extends SparkListener {
33+
class StorageStatusListener(conf: SparkConf) extends SparkListener {
3334
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
3435
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
36+
private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]()
37+
private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)
3538

3639
def storageStatusList: Seq[StorageStatus] = synchronized {
3740
executorIdToStorageStatus.values.toSeq
3841
}
3942

43+
def deadStorageStatusList: Seq[StorageStatus] = synchronized {
44+
deadExecutorStorageStatus.toSeq
45+
}
46+
4047
/** Update storage status list to reflect updated block statuses */
4148
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
4249
executorIdToStorageStatus.get(execId).foreach { storageStatus =>
@@ -87,8 +94,12 @@ class StorageStatusListener extends SparkListener {
8794
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
8895
synchronized {
8996
val executorId = blockManagerRemoved.blockManagerId.executorId
90-
executorIdToStorageStatus.remove(executorId)
97+
executorIdToStorageStatus.remove(executorId).foreach { status =>
98+
deadExecutorStorageStatus += status
99+
}
100+
if (deadExecutorStorageStatus.size > retainedDeadExecutors) {
101+
deadExecutorStorageStatus.trimStart(1)
102+
}
91103
}
92104
}
93-
94105
}

0 commit comments

Comments
 (0)