1717
1818package org .apache .spark
1919
20+ import java .util .concurrent .ConcurrentHashMap
21+
2022import scala .collection .JavaConverters ._
21- import scala .collection .mutable .{HashMap , LinkedHashSet }
23+ import scala .collection .mutable .LinkedHashSet
24+
2225import org .apache .spark .serializer .KryoSerializer
2326
2427/**
@@ -46,12 +49,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
4649 /** Create a SparkConf that loads defaults from system properties and the classpath */
4750 def this () = this (true )
4851
49- private [spark] val settings = new HashMap [String , String ]()
52+ private val settings = new ConcurrentHashMap [String , String ]()
5053
5154 if (loadDefaults) {
5255 // Load any spark.* system properties
5356 for ((k, v) <- System .getProperties.asScala if k.startsWith(" spark." )) {
54- settings(k) = v
57+ set(k, v)
5558 }
5659 }
5760
@@ -63,7 +66,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
6366 if (value == null ) {
6467 throw new NullPointerException (" null value for " + key)
6568 }
66- settings(key) = value
69+ settings.put (key, value)
6770 this
6871 }
6972
@@ -129,15 +132,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
129132
130133 /** Set multiple parameters together */
131134 def setAll (settings : Traversable [(String , String )]) = {
132- this .settings ++= settings
135+ this .settings.putAll( settings.toMap.asJava)
133136 this
134137 }
135138
136139 /** Set a parameter if it isn't already configured */
137140 def setIfMissing (key : String , value : String ): SparkConf = {
138- if (! settings.contains(key)) {
139- settings(key) = value
140- }
141+ settings.putIfAbsent(key, value)
141142 this
142143 }
143144
@@ -163,21 +164,23 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
163164
164165 /** Get a parameter; throws a NoSuchElementException if it's not set */
165166 def get (key : String ): String = {
166- settings .getOrElse(key, throw new NoSuchElementException (key))
167+ getOption(key) .getOrElse(throw new NoSuchElementException (key))
167168 }
168169
169170 /** Get a parameter, falling back to a default if not set */
170171 def get (key : String , defaultValue : String ): String = {
171- settings .getOrElse(key, defaultValue)
172+ getOption(key) .getOrElse(defaultValue)
172173 }
173174
174175 /** Get a parameter as an Option */
175176 def getOption (key : String ): Option [String ] = {
176- settings.get(key)
177+ Option ( settings.get(key) )
177178 }
178179
179180 /** Get all parameters as a list of pairs */
180- def getAll : Array [(String , String )] = settings.clone().toArray
181+ def getAll : Array [(String , String )] = {
182+ settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
183+ }
181184
182185 /** Get a parameter as an integer, falling back to a default if not set */
183186 def getInt (key : String , defaultValue : Int ): Int = {
@@ -224,11 +227,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
224227 def getAppId : String = get(" spark.app.id" )
225228
226229 /** Does the configuration contain a given parameter? */
227- def contains (key : String ): Boolean = settings.contains (key)
230+ def contains (key : String ): Boolean = settings.containsKey (key)
228231
229232 /** Copy this object */
230233 override def clone : SparkConf = {
231- new SparkConf (false ).setAll(settings )
234+ new SparkConf (false ).setAll(getAll )
232235 }
233236
234237 /**
@@ -240,7 +243,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
240243 /** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
241244 * idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
242245 private [spark] def validateSettings () {
243- if (settings. contains(" spark.local.dir" )) {
246+ if (contains(" spark.local.dir" )) {
244247 val msg = " In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
245248 " the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
246249 logWarning(msg)
@@ -265,7 +268,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
265268 }
266269
267270 // Validate spark.executor.extraJavaOptions
268- settings.get (executorOptsKey).map { javaOpts =>
271+ getOption (executorOptsKey).map { javaOpts =>
269272 if (javaOpts.contains(" -Dspark" )) {
270273 val msg = s " $executorOptsKey is not allowed to set Spark options (was ' $javaOpts'). " +
271274 " Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
@@ -345,7 +348,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
345348 * configuration out for debugging.
346349 */
347350 def toDebugString : String = {
348- settings.toArray .sorted.map{case (k, v) => k + " =" + v}.mkString(" \n " )
351+ getAll .sorted.map{case (k, v) => k + " =" + v}.mkString(" \n " )
349352 }
350353}
351354
0 commit comments