Skip to content

Commit dadff5f

Browse files
erenavsarogullarikayousterhout
authored andcommitted
[SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Logging
Fair Scheduler Logging for the following cases can be useful for the user. 1. If **valid** `spark.scheduler.allocation.file` property is set, user can be informed and aware which scheduler file is processed when `SparkContext` initializes. 2. If **invalid** `spark.scheduler.allocation.file` property is set, currently, the following stacktrace is shown to user. In addition to this, more meaningful message can be shown to user by emphasizing the problem at building level of fair scheduler. Also other potential issues can be covered at this level as **Fair Scheduler can not be built. + exception stacktrace** ``` Exception in thread "main" java.io.FileNotFoundException: INVALID_FILE (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at java.io.FileInputStream.<init>(FileInputStream.java:93) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:76) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:75) ``` 3. If `spark.scheduler.allocation.file` property is not set and **default** fair scheduler file (**fairscheduler.xml**) is found in classpath, it will be loaded but currently, user is not informed for using default file so logging can be useful as **Fair Scheduler file: fairscheduler.xml is found successfully and will be parsed.** 4. If **spark.scheduler.allocation.file** property is not set and **default** fair scheduler file does not exist in classpath, currently, user is not informed so logging can be useful as **No Fair Scheduler file found.** Also this PR is related with #15237 to emphasize fileName in warning logs when fair scheduler file has invalid minShare, weight or schedulingMode values. ## How was this patch tested? Added new Unit Tests. Author: erenavsarogullari <[email protected]> Closes #16813 from erenavsarogullari/SPARK-19466.
1 parent c5a6635 commit dadff5f

File tree

1 file changed

+46
-21
lines changed

1 file changed

+46
-21
lines changed

core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
2020
import java.io.{FileInputStream, InputStream}
2121
import java.util.{NoSuchElementException, Properties}
2222

23+
import scala.util.control.NonFatal
2324
import scala.xml.{Node, XML}
2425

2526
import org.apache.spark.SparkConf
@@ -55,7 +56,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
5556
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
5657
extends SchedulableBuilder with Logging {
5758

58-
val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file")
59+
val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
60+
val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY)
5961
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
6062
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
6163
val DEFAULT_POOL_NAME = "default"
@@ -69,19 +71,35 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
6971
val DEFAULT_WEIGHT = 1
7072

7173
override def buildPools() {
72-
var is: Option[InputStream] = None
74+
var fileData: Option[(InputStream, String)] = None
7375
try {
74-
is = Option {
75-
schedulerAllocFile.map { f =>
76-
new FileInputStream(f)
77-
}.getOrElse {
78-
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
76+
fileData = schedulerAllocFile.map { f =>
77+
val fis = new FileInputStream(f)
78+
logInfo(s"Creating Fair Scheduler pools from $f")
79+
Some((fis, f))
80+
}.getOrElse {
81+
val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
82+
if (is != null) {
83+
logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
84+
Some((is, DEFAULT_SCHEDULER_FILE))
85+
} else {
86+
logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " +
87+
s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " +
88+
s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.")
89+
None
7990
}
8091
}
8192

82-
is.foreach { i => buildFairSchedulerPool(i) }
93+
fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) }
94+
} catch {
95+
case NonFatal(t) =>
96+
val defaultMessage = "Error while building the fair scheduler pools"
97+
val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" }
98+
.getOrElse(defaultMessage)
99+
logError(message, t)
100+
throw t
83101
} finally {
84-
is.foreach(_.close())
102+
fileData.foreach { case (is, fileName) => is.close() }
85103
}
86104

87105
// finally create "default" pool
@@ -93,36 +111,41 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
93111
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
94112
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
95113
rootPool.addSchedulable(pool)
96-
logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
114+
logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
97115
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
98116
}
99117
}
100118

101-
private def buildFairSchedulerPool(is: InputStream) {
119+
private def buildFairSchedulerPool(is: InputStream, fileName: String) {
102120
val xml = XML.load(is)
103121
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
104122

105123
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
106124

107-
val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
108-
val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
109-
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)
125+
val schedulingMode = getSchedulingModeValue(poolNode, poolName,
126+
DEFAULT_SCHEDULING_MODE, fileName)
127+
val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY,
128+
DEFAULT_MINIMUM_SHARE, fileName)
129+
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY,
130+
DEFAULT_WEIGHT, fileName)
110131

111132
rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
112133

113-
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
134+
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
114135
poolName, schedulingMode, minShare, weight))
115136
}
116137
}
117138

118139
private def getSchedulingModeValue(
119140
poolNode: Node,
120141
poolName: String,
121-
defaultValue: SchedulingMode): SchedulingMode = {
142+
defaultValue: SchedulingMode,
143+
fileName: String): SchedulingMode = {
122144

123145
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
124-
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
125-
s"schedulingMode: $defaultValue for pool: $poolName"
146+
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " +
147+
s"Fair Scheduler configuration file: $fileName, using " +
148+
s"the default schedulingMode: $defaultValue for pool: $poolName"
126149
try {
127150
if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
128151
SchedulingMode.withName(xmlSchedulingMode)
@@ -140,14 +163,16 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
140163
private def getIntValue(
141164
poolNode: Node,
142165
poolName: String,
143-
propertyName: String, defaultValue: Int): Int = {
166+
propertyName: String,
167+
defaultValue: Int,
168+
fileName: String): Int = {
144169

145170
val data = (poolNode \ propertyName).text.trim
146171
try {
147172
data.toInt
148173
} catch {
149174
case e: NumberFormatException =>
150-
logWarning(s"Error while loading scheduler allocation file. " +
175+
logWarning(s"Error while loading fair scheduler configuration from $fileName: " +
151176
s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
152177
s"$defaultValue for pool: $poolName")
153178
defaultValue
@@ -166,7 +191,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
166191
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
167192
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
168193
rootPool.addSchedulable(parentPool)
169-
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
194+
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
170195
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
171196
}
172197
}

0 commit comments

Comments
 (0)