Skip to content

Commit 1991337

Browse files
Marcelo VanzinAndrew Or
authored andcommitted
[SPARK-5933] [core] Move config deprecation warnings to SparkConf.
I didn't find many deprecated configs after a grep-based search, but the ones I could find were moved to the centralized location in SparkConf. While there, I deprecated a couple more HS configs that mentioned time units. Author: Marcelo Vanzin <[email protected]> Closes apache#5562 from vanzin/SPARK-5933 and squashes the following commits: dcb617e [Marcelo Vanzin] [SPARK-5933] [core] Move config deprecation warnings to SparkConf.
1 parent 6fbeb82 commit 1991337

File tree

6 files changed

+30
-39
lines changed

6 files changed

+30
-39
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,9 @@ private[spark] object SparkConf extends Logging {
403403
*/
404404
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
405405
val configs = Seq(
406+
DeprecatedConfig("spark.cache.class", "0.8",
407+
"The spark.cache.class property is no longer being used! Specify storage levels using " +
408+
"the RDD.persist() method instead."),
406409
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
407410
"Please use spark.{driver,executor}.userClassPathFirst instead."))
408411
Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
@@ -420,7 +423,15 @@ private[spark] object SparkConf extends Logging {
420423
"spark.history.fs.update.interval" -> Seq(
421424
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
422425
AlternateConfig("spark.history.fs.updateInterval", "1.3"),
423-
AlternateConfig("spark.history.updateInterval", "1.3"))
426+
AlternateConfig("spark.history.updateInterval", "1.3")),
427+
"spark.history.fs.cleaner.interval" -> Seq(
428+
AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
429+
"spark.history.fs.cleaner.maxAge" -> Seq(
430+
AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
431+
"spark.yarn.am.waitTime" -> Seq(
432+
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
433+
// Translate old value to a duration, with 10s wait time per try.
434+
translation = s => s"${s.toLong * 10}s"))
424435
)
425436

426437
/**
@@ -470,7 +481,7 @@ private[spark] object SparkConf extends Logging {
470481
configsWithAlternatives.get(key).flatMap { alts =>
471482
alts.collectFirst { case alt if conf.contains(alt.key) =>
472483
val value = conf.get(alt.key)
473-
alt.translation.map(_(value)).getOrElse(value)
484+
if (alt.translation != null) alt.translation(value) else value
474485
}
475486
}
476487
}
@@ -514,6 +525,6 @@ private[spark] object SparkConf extends Logging {
514525
private case class AlternateConfig(
515526
key: String,
516527
version: String,
517-
translation: Option[String => String] = None)
528+
translation: String => String = null)
518529

519530
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class SparkEnv (
103103
// actorSystem.awaitTermination()
104104

105105
// Note that blockTransferService is stopped by BlockManager since it is started by it.
106-
106+
107107
// If we only stop sc, but the driver process still run as a services then we need to delete
108108
// the tmp dir, if not, it will create too many tmp dirs.
109109
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
@@ -375,12 +375,6 @@ object SparkEnv extends Logging {
375375
"."
376376
}
377377

378-
// Warn about deprecated spark.cache.class property
379-
if (conf.contains("spark.cache.class")) {
380-
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
381-
"levels using the RDD.persist() method instead.")
382-
}
383-
384378
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
385379
new OutputCommitCoordinator(conf)
386380
}
@@ -406,7 +400,7 @@ object SparkEnv extends Logging {
406400
shuffleMemoryManager,
407401
outputCommitCoordinator,
408402
conf)
409-
403+
410404
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
411405
// called, and we only need to do it for driver. Because driver may run as a service, and if we
412406
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
5252
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
5353

5454
// Interval between each cleaner checks for event logs to delete
55-
private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
56-
DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000
55+
private val CLEAN_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.cleaner.interval", "1d")
5756

5857
private val logDir = conf.getOption("spark.history.fs.logDirectory")
5958
.map { d => Utils.resolveURI(d).toString }
@@ -130,8 +129,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
130129

131130
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
132131
// A task that periodically cleans event logs on disk.
133-
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
134-
TimeUnit.MILLISECONDS)
132+
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
135133
}
136134
}
137135
}
@@ -270,8 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
270268
try {
271269
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
272270
.getOrElse(Seq[FileStatus]())
273-
val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds",
274-
DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
271+
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
275272

276273
val now = System.currentTimeMillis()
277274
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
@@ -417,12 +414,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
417414

418415
private object FsHistoryProvider {
419416
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
420-
421-
// One day
422-
val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds
423-
424-
// One week
425-
val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
426417
}
427418

428419
private class FsApplicationHistoryInfo(

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,9 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
217217

218218
val count = conf.getAll.filter { case (k, v) => k.startsWith("spark.history.") }.size
219219
assert(count === 4)
220+
221+
conf.set("spark.yarn.applicationMaster.waitTries", "42")
222+
assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
220223
}
221224

222225
}

docs/monitoring.md

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,19 +153,18 @@ follows:
153153
</td>
154154
</tr>
155155
<tr>
156-
<td>spark.history.fs.cleaner.interval.seconds</td>
157-
<td>86400</td>
156+
<td>spark.history.fs.cleaner.interval</td>
157+
<td>1d</td>
158158
<td>
159-
How often the job history cleaner checks for files to delete, in seconds. Defaults to 86400 (one day).
160-
Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds.
159+
How often the job history cleaner checks for files to delete.
160+
Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.
161161
</td>
162162
</tr>
163163
<tr>
164-
<td>spark.history.fs.cleaner.maxAge.seconds</td>
165-
<td>3600 * 24 * 7</td>
164+
<td>spark.history.fs.cleaner.maxAge</td>
165+
<td>7d</td>
166166
<td>
167-
Job history files older than this many seconds will be deleted when the history cleaner runs.
168-
Defaults to 3600 * 24 * 7 (1 week).
167+
Job history files older than this will be deleted when the history cleaner runs.
169168
</td>
170169
</tr>
171170
</table>

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -373,14 +373,7 @@ private[spark] class ApplicationMaster(
373373
private def waitForSparkContextInitialized(): SparkContext = {
374374
logInfo("Waiting for spark context initialization")
375375
sparkContextRef.synchronized {
376-
val waitTries = sparkConf.getOption("spark.yarn.applicationMaster.waitTries")
377-
.map(_.toLong * 10000L)
378-
if (waitTries.isDefined) {
379-
logWarning(
380-
"spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime")
381-
}
382-
val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime",
383-
s"${waitTries.getOrElse(100000L)}ms")
376+
val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", "100s")
384377
val deadline = System.currentTimeMillis() + totalWaitTime
385378

386379
while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) {

0 commit comments

Comments
 (0)