Skip to content

Commit 1cae338

Browse files
committed
Merge remote-tracking branch 'origin/master' into SPARK-21040-speculate-decommission-exec-tasks
2 parents f5a7313 + 45864fa commit 1cae338

File tree

239 files changed

+6738
-2877
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

239 files changed

+6738
-2877
lines changed

core/src/main/java/org/apache/spark/SparkFirehoseListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe
162162
onEvent(speculativeTask);
163163
}
164164

165+
@Override
166+
public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) {
167+
onEvent(event);
168+
}
169+
165170
@Override
166171
public void onOtherEvent(SparkListenerEvent event) {
167172
onEvent(event);

core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ <h4 class="title-table">Executors</h4>
8989
<th>Disk Used</th>
9090
<th>Cores</th>
9191
<th>Resources</th>
92+
<th>Resource Profile Id</th>
9293
<th><span data-toggle="tooltip" data-placement="top" title="Number of tasks currently executing. Darker shading highlights executors with more active tasks.">Active Tasks</span></th>
9394
<th><span data-toggle="tooltip" data-placement="top" title="Number of tasks that have failed on this executor. Darker shading highlights executors with a high proportion of failed tasks.">Failed Tasks</span></th>
9495
<th>Complete Tasks</th>

core/src/main/resources/org/apache/spark/ui/static/executorspage.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ function totalDurationColor(totalGCTime, totalDuration) {
119119
}
120120

121121
var sumOptionalColumns = [3, 4];
122-
var execOptionalColumns = [5, 6, 9];
122+
var execOptionalColumns = [5, 6, 9, 10];
123123
var execDataTable;
124124
var sumDataTable;
125125

@@ -415,6 +415,7 @@ $(document).ready(function () {
415415
{data: 'diskUsed', render: formatBytes},
416416
{data: 'totalCores'},
417417
{name: 'resourcesCol', data: 'resources', render: formatResourceCells, orderable: false},
418+
{name: 'resourceProfileIdCol', data: 'resourceProfileId'},
418419
{
419420
data: 'activeTasks',
420421
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
@@ -461,7 +462,8 @@ $(document).ready(function () {
461462
"columnDefs": [
462463
{"visible": false, "targets": 5},
463464
{"visible": false, "targets": 6},
464-
{"visible": false, "targets": 9}
465+
{"visible": false, "targets": 9},
466+
{"visible": false, "targets": 10}
465467
],
466468
"deferRender": true
467469
};
@@ -570,6 +572,7 @@ $(document).ready(function () {
570572
"<div id='on_heap_memory' class='on-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='3' data-exec-col-idx='5'>On Heap Memory</div>" +
571573
"<div id='off_heap_memory' class='off-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='4' data-exec-col-idx='6'>Off Heap Memory</div>" +
572574
"<div id='extra_resources' class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='9'>Resources</div>" +
575+
"<div id='resource_prof_id' class='resource-prof-id-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='10'>Resource Profile Id</div>" +
573576
"</div>");
574577

575578
reselectCheckboxesBasedOnTaskTableState();

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ class SparkContext(config: SparkConf) extends Logging {
435435
}
436436

437437
_listenerBus = new LiveListenerBus(_conf)
438-
_resourceProfileManager = new ResourceProfileManager(_conf)
438+
_resourceProfileManager = new ResourceProfileManager(_conf, _listenerBus)
439439

440440
// Initialize the app status store and listener before SparkEnv is created so that it gets
441441
// all events.

core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ private[spark] class HistoryAppStatusStore(
7272
source.totalGCTime, source.totalInputBytes, source.totalShuffleRead,
7373
source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime,
7474
source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics,
75-
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources)
75+
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources,
76+
source.resourceProfileId)
7677
}
7778

7879
}

core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,18 +122,20 @@ private class HistoryServerDiskManager(
122122
* being used so that it's not evicted when running out of designated space.
123123
*/
124124
def openStore(appId: String, attemptId: Option[String]): Option[File] = {
125+
var newSize: Long = 0
125126
val storePath = active.synchronized {
126127
val path = appStorePath(appId, attemptId)
127128
if (path.isDirectory()) {
128-
active(appId -> attemptId) = sizeOf(path)
129+
newSize = sizeOf(path)
130+
active(appId -> attemptId) = newSize
129131
Some(path)
130132
} else {
131133
None
132134
}
133135
}
134136

135137
storePath.foreach { path =>
136-
updateAccessTime(appId, attemptId)
138+
updateApplicationStoreInfo(appId, attemptId, newSize)
137139
}
138140

139141
storePath
@@ -238,10 +240,11 @@ private class HistoryServerDiskManager(
238240
new File(appStoreDir, fileName)
239241
}
240242

241-
private def updateAccessTime(appId: String, attemptId: Option[String]): Unit = {
243+
private def updateApplicationStoreInfo(
244+
appId: String, attemptId: Option[String], newSize: Long): Unit = {
242245
val path = appStorePath(appId, attemptId)
243-
val info = ApplicationStoreInfo(path.getAbsolutePath(), clock.getTimeMillis(), appId, attemptId,
244-
sizeOf(path))
246+
val info = ApplicationStoreInfo(path.getAbsolutePath(), clock.getTimeMillis(), appId,
247+
attemptId, newSize)
245248
listing.write(info)
246249
}
247250

@@ -297,7 +300,7 @@ private class HistoryServerDiskManager(
297300
s"exceeded ($current > $max)")
298301
}
299302

300-
updateAccessTime(appId, attemptId)
303+
updateApplicationStoreInfo(appId, attemptId, newSize)
301304

302305
active.synchronized {
303306
active(appId -> attemptId) = newSize

core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,19 @@ import org.apache.spark.{SparkConf, SparkException}
2525
import org.apache.spark.annotation.Evolving
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.internal.config.Tests._
28+
import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerResourceProfileAdded}
2829
import org.apache.spark.util.Utils
2930
import org.apache.spark.util.Utils.isTesting
3031

3132
/**
3233
* Manager of resource profiles. The manager allows one place to keep the actual ResourceProfiles
3334
* and everywhere else we can use the ResourceProfile Id to save on space.
34-
* Note we never remove a resource profile at this point. Its expected this number if small
35+
* Note we never remove a resource profile at this point. Its expected this number is small
3536
* so this shouldn't be much overhead.
3637
*/
3738
@Evolving
38-
private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Logging {
39+
private[spark] class ResourceProfileManager(sparkConf: SparkConf,
40+
listenerBus: LiveListenerBus) extends Logging {
3941
private val resourceProfileIdToResourceProfile = new HashMap[Int, ResourceProfile]()
4042

4143
private val (readLock, writeLock) = {
@@ -83,6 +85,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
8385
// force the computation of maxTasks and limitingResource now so we don't have cost later
8486
rp.limitingResource(sparkConf)
8587
logInfo(s"Added ResourceProfile id: ${rp.id}")
88+
listenerBus.post(SparkListenerResourceProfileAdded(rp))
8689
}
8790
}
8891

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,10 @@ private[spark] class EventLoggingListener(
235235
}
236236
}
237237

238+
override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = {
239+
logEvent(event, flushLogger = true)
240+
}
241+
238242
override def onOtherEvent(event: SparkListenerEvent): Unit = {
239243
if (event.logEvent) {
240244
logEvent(event, flushLogger = true)

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo
2727
import org.apache.spark.TaskEndReason
2828
import org.apache.spark.annotation.DeveloperApi
2929
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
30+
import org.apache.spark.resource.ResourceProfile
3031
import org.apache.spark.scheduler.cluster.ExecutorInfo
3132
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
3233

@@ -207,6 +208,10 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
207208
@DeveloperApi
208209
case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
209210

211+
@DeveloperApi
212+
case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile)
213+
extends SparkListenerEvent
214+
210215
/**
211216
* Interface for listening to events from the Spark scheduler. Most applications should probably
212217
* extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
@@ -348,6 +353,11 @@ private[spark] trait SparkListenerInterface {
348353
* Called when other events like SQL-specific events are posted.
349354
*/
350355
def onOtherEvent(event: SparkListenerEvent): Unit
356+
357+
/**
358+
* Called when a Resource Profile is added to the manager.
359+
*/
360+
def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit
351361
}
352362

353363

@@ -421,4 +431,6 @@ abstract class SparkListener extends SparkListenerInterface {
421431
speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { }
422432

423433
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
434+
435+
override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = { }
424436
}

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ private[spark] trait SparkListenerBus
7979
listener.onBlockUpdated(blockUpdated)
8080
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
8181
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
82+
case resourceProfileAdded: SparkListenerResourceProfileAdded =>
83+
listener.onResourceProfileAdded(resourceProfileAdded)
8284
case _ => listener.onOtherEvent(event)
8385
}
8486
}

0 commit comments

Comments
 (0)