Skip to content

Commit 457166e

Browse files
committed
Merge remote-tracking branch 'apache/master' into SPARK-5886
2 parents f8b30f4 + 6930e96 commit 457166e

File tree

61 files changed

+929
-671
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+929
-671
lines changed

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private[deploy] class ApplicationInfo(
9191
}
9292
}
9393

94-
private[master] val requestedCores = desc.maxCores.getOrElse(defaultCores)
94+
private val requestedCores = desc.maxCores.getOrElse(defaultCores)
9595

9696
private[master] def coresLeft: Int = requestedCores - coresGranted
9797

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,12 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
7575
val workers = state.workers.sortBy(_.id)
7676
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
7777

78-
val activeAppHeaders = Seq("Application ID", "Name", "Cores in Use",
79-
"Cores Requested", "Memory per Node", "Submitted Time", "User", "State", "Duration")
78+
val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
79+
"User", "State", "Duration")
8080
val activeApps = state.activeApps.sortBy(_.startTime).reverse
81-
val activeAppsTable = UIUtils.listingTable(activeAppHeaders, activeAppRow, activeApps)
82-
83-
val completedAppHeaders = Seq("Application ID", "Name", "Cores Requested", "Memory per Node",
84-
"Submitted Time", "User", "State", "Duration")
81+
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
8582
val completedApps = state.completedApps.sortBy(_.endTime).reverse
86-
val completedAppsTable = UIUtils.listingTable(completedAppHeaders, completeAppRow,
87-
completedApps)
83+
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
8884

8985
val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
9086
"Memory", "Main Class")
@@ -191,7 +187,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
191187
</tr>
192188
}
193189

194-
private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
190+
private def appRow(app: ApplicationInfo): Seq[Node] = {
195191
val killLink = if (parent.killEnabled &&
196192
(app.state == ApplicationState.RUNNING || app.state == ApplicationState.WAITING)) {
197193
val killLinkUri = s"app/kill?id=${app.id}&terminate=true"
@@ -201,7 +197,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
201197
(<a href={killLinkUri} onclick={confirm}>kill</a>)
202198
</span>
203199
}
204-
205200
<tr>
206201
<td>
207202
<a href={"app?appId=" + app.id}>{app.id}</a>
@@ -210,15 +205,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
210205
<td>
211206
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
212207
</td>
213-
{
214-
if (active) {
215-
<td>
216-
{app.coresGranted}
217-
</td>
218-
}
219-
}
220208
<td>
221-
{if (app.requestedCores == Int.MaxValue) "*" else app.requestedCores}
209+
{app.coresGranted}
222210
</td>
223211
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
224212
{Utils.megabytesToString(app.desc.memoryPerSlave)}
@@ -230,14 +218,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
230218
</tr>
231219
}
232220

233-
private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
234-
appRow(app, active = true)
235-
}
236-
237-
private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
238-
appRow(app, active = false)
239-
}
240-
241221
private def driverRow(driver: DriverInfo): Seq[Node] = {
242222
val killLink = if (parent.killEnabled &&
243223
(driver.state == DriverState.RUNNING ||

core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,11 @@ class TaskInfo(
8181

8282
def status: String = {
8383
if (running) {
84-
"RUNNING"
85-
} else if (gettingResult) {
86-
"GET RESULT"
84+
if (gettingResult) {
85+
"GET RESULT"
86+
} else {
87+
"RUNNING"
88+
}
8789
} else if (failed) {
8890
"FAILED"
8991
} else if (successful) {

core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ class FileShuffleBlockManager(conf: SparkConf)
112112
private val shuffleState = shuffleStates(shuffleId)
113113
private var fileGroup: ShuffleFileGroup = null
114114

115+
val openStartTime = System.nanoTime
115116
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
116117
fileGroup = getUnusedFileGroup()
117118
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
@@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf)
135136
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
136137
}
137138
}
139+
// Creating the file to write to and creating a disk writer both involve interacting with
140+
// the disk, so should be included in the shuffle write time.
141+
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
138142

139143
override def releaseWriters(success: Boolean) {
140144
if (consolidateShuffleFiles) {

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C](
6363
sorter.insertAll(records)
6464
}
6565

66+
// Don't bother including the time to open the merged output file in the shuffle write time,
67+
// because it just opens a single file, so is typically too fast to measure accurately
68+
// (see SPARK-3570).
6669
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
6770
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
6871
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -269,11 +269,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
269269
</td> +: getFormattedTimeQuantiles(serializationTimes)
270270

271271
val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
272-
if (info.gettingResultTime > 0) {
273-
(info.finishTime - info.gettingResultTime).toDouble
274-
} else {
275-
0.0
276-
}
272+
getGettingResultTime(info).toDouble
277273
}
278274
val gettingResultQuantiles =
279275
<td>
@@ -464,7 +460,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
464460
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
465461
val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
466462
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
467-
val gettingResultTime = info.gettingResultTime
463+
val gettingResultTime = getGettingResultTime(info)
468464

469465
val maybeAccumulators = info.accumulables
470466
val accumulatorsReadable = maybeAccumulators.map{acc => s"${acc.name}: ${acc.update.get}"}
@@ -627,6 +623,19 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
627623
<td>{errorSummary}{details}</td>
628624
}
629625

626+
private def getGettingResultTime(info: TaskInfo): Long = {
627+
if (info.gettingResultTime > 0) {
628+
if (info.finishTime > 0) {
629+
info.finishTime - info.gettingResultTime
630+
} else {
631+
// The task is still fetching the result.
632+
System.currentTimeMillis - info.gettingResultTime
633+
}
634+
} else {
635+
0L
636+
}
637+
}
638+
630639
private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = {
631640
val totalExecutionTime =
632641
if (info.gettingResult) {
@@ -638,6 +647,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
638647
}
639648
val executorOverhead = (metrics.executorDeserializeTime +
640649
metrics.resultSerializationTime)
641-
math.max(0, totalExecutionTime - metrics.executorRunTime - executorOverhead)
650+
math.max(
651+
0,
652+
totalExecutionTime - metrics.executorRunTime - executorOverhead - getGettingResultTime(info))
642653
}
643654
}

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,13 +352,18 @@ private[spark] class ExternalSorter[K, V, C](
352352
// Create our file writers if we haven't done so yet
353353
if (partitionWriters == null) {
354354
curWriteMetrics = new ShuffleWriteMetrics()
355+
val openStartTime = System.nanoTime
355356
partitionWriters = Array.fill(numPartitions) {
356357
// Because these files may be read during shuffle, their compression must be controlled by
357358
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
358359
// createTempShuffleBlock here; see SPARK-3426 for more context.
359360
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
360361
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open()
361362
}
363+
// Creating the file to write to and creating a disk writer both involve interacting with
364+
// the disk, and can take a long time in aggregate when we open many files, so should be
365+
// included in the shuffle write time.
366+
curWriteMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
362367
}
363368

364369
// No need to sort stuff, just write each element out

docs/running-on-yarn.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,6 @@ If you need a reference to the proper location to put log files in the YARN so t
274274
# Important notes
275275

276276
- Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
277-
- The local directories used by Spark executors will be the local directories configured for YARN (Hadoop YARN config `yarn.nodemanager.local-dirs`). If the user specifies `spark.local.dir`, it will be ignored.
277+
- In `yarn-cluster` mode, the local directories used by the Spark executors and the Spark driver will be the local directories configured for YARN (Hadoop YARN config `yarn.nodemanager.local-dirs`). If the user specifies `spark.local.dir`, it will be ignored. In `yarn-client` mode, the Spark executors will use the local directories configured for YARN while the Spark driver will use those defined in `spark.local.dir`. This is because the Spark driver does not run on the YARN cluster in `yarn-client` mode, only the Spark executors do.
278278
- The `--files` and `--archives` options support specifying file names with the # similar to Hadoop. For example you can specify: `--files localtest.txt#appSees.txt` and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name `appSees.txt`, and your application should use the name as `appSees.txt` to reference it when running on YARN.
279279
- The `--jars` option allows the `SparkContext.addJar` function to work if you are using it with local files and running in `yarn-cluster` mode. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.

repl/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@
8484
<artifactId>scalacheck_${scala.binary.version}</artifactId>
8585
<scope>test</scope>
8686
</dependency>
87+
<dependency>
88+
<groupId>org.mockito</groupId>
89+
<artifactId>mockito-all</artifactId>
90+
<scope>test</scope>
91+
</dependency>
8792

8893
<!-- Explicit listing of transitive deps that are shaded. Otherwise, odd compiler crashes. -->
8994
<dependency>

repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala

Lines changed: 67 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
package org.apache.spark.repl
1919

20-
import java.io.{ByteArrayOutputStream, InputStream, FileNotFoundException}
21-
import java.net.{URI, URL, URLEncoder}
22-
import java.util.concurrent.{Executors, ExecutorService}
20+
import java.io.{IOException, ByteArrayOutputStream, InputStream}
21+
import java.net.{HttpURLConnection, URI, URL, URLEncoder}
22+
23+
import scala.util.control.NonFatal
2324

2425
import org.apache.hadoop.fs.{FileSystem, Path}
2526

@@ -43,6 +44,9 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
4344

4445
val parentLoader = new ParentClassLoader(parent)
4546

47+
// Allows HTTP connect and read timeouts to be controlled for testing / debugging purposes
48+
private[repl] var httpUrlConnectionTimeoutMillis: Int = -1
49+
4650
// Hadoop FileSystem object for our URI, if it isn't using HTTP
4751
var fileSystem: FileSystem = {
4852
if (Set("http", "https", "ftp").contains(uri.getScheme)) {
@@ -71,37 +75,82 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
7175
}
7276
}
7377

78+
private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = {
79+
val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
80+
val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
81+
val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager)
82+
newuri.toURL
83+
} else {
84+
new URL(classUri + "/" + urlEncode(pathInDirectory))
85+
}
86+
val connection: HttpURLConnection = Utils.setupSecureURLConnection(url.openConnection(),
87+
SparkEnv.get.securityManager).asInstanceOf[HttpURLConnection]
88+
// Set the connection timeouts (for testing purposes)
89+
if (httpUrlConnectionTimeoutMillis != -1) {
90+
connection.setConnectTimeout(httpUrlConnectionTimeoutMillis)
91+
connection.setReadTimeout(httpUrlConnectionTimeoutMillis)
92+
}
93+
connection.connect()
94+
try {
95+
if (connection.getResponseCode != 200) {
96+
// Close the error stream so that the connection is eligible for re-use
97+
try {
98+
connection.getErrorStream.close()
99+
} catch {
100+
case ioe: IOException =>
101+
logError("Exception while closing error stream", ioe)
102+
}
103+
throw new ClassNotFoundException(s"Class file not found at URL $url")
104+
} else {
105+
connection.getInputStream
106+
}
107+
} catch {
108+
case NonFatal(e) if !e.isInstanceOf[ClassNotFoundException] =>
109+
connection.disconnect()
110+
throw e
111+
}
112+
}
113+
114+
private def getClassFileInputStreamFromFileSystem(pathInDirectory: String): InputStream = {
115+
val path = new Path(directory, pathInDirectory)
116+
if (fileSystem.exists(path)) {
117+
fileSystem.open(path)
118+
} else {
119+
throw new ClassNotFoundException(s"Class file not found at path $path")
120+
}
121+
}
122+
74123
def findClassLocally(name: String): Option[Class[_]] = {
124+
val pathInDirectory = name.replace('.', '/') + ".class"
125+
var inputStream: InputStream = null
75126
try {
76-
val pathInDirectory = name.replace('.', '/') + ".class"
77-
val inputStream = {
127+
inputStream = {
78128
if (fileSystem != null) {
79-
fileSystem.open(new Path(directory, pathInDirectory))
129+
getClassFileInputStreamFromFileSystem(pathInDirectory)
80130
} else {
81-
val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
82-
val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
83-
val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager)
84-
newuri.toURL
85-
} else {
86-
new URL(classUri + "/" + urlEncode(pathInDirectory))
87-
}
88-
89-
Utils.setupSecureURLConnection(url.openConnection(), SparkEnv.get.securityManager)
90-
.getInputStream
131+
getClassFileInputStreamFromHttpServer(pathInDirectory)
91132
}
92133
}
93134
val bytes = readAndTransformClass(name, inputStream)
94-
inputStream.close()
95135
Some(defineClass(name, bytes, 0, bytes.length))
96136
} catch {
97-
case e: FileNotFoundException =>
137+
case e: ClassNotFoundException =>
98138
// We did not find the class
99139
logDebug(s"Did not load class $name from REPL class server at $uri", e)
100140
None
101141
case e: Exception =>
102142
// Something bad happened while checking if the class exists
103143
logError(s"Failed to check existence of class $name on REPL class server at $uri", e)
104144
None
145+
} finally {
146+
if (inputStream != null) {
147+
try {
148+
inputStream.close()
149+
} catch {
150+
case e: Exception =>
151+
logError("Exception while closing inputStream", e)
152+
}
153+
}
105154
}
106155
}
107156

0 commit comments

Comments
 (0)