Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,6 @@ object ConfigurationUtils extends Logging {
}).getOrElse(Map.empty[String, String])
}

def combinePrefixedKeyValuePairsWithDeprecatedConf(
sparkConf: SparkConf,
prefix: String,
deprecatedConf: OptionalConfigEntry[String],
configType: String): Map[String, String] = {
val deprecatedKeyValuePairsString = sparkConf.get(deprecatedConf)
deprecatedKeyValuePairsString.foreach { _ =>
logWarning(s"Configuration with key ${deprecatedConf.key} is deprecated. Use" +
s" configurations with prefix $prefix<key> instead.")
}
val fromDeprecated = parseKeyValuePairs(
deprecatedKeyValuePairsString,
deprecatedConf.key,
configType)
val fromPrefix = sparkConf.getAllWithPrefix(prefix)
val combined = fromDeprecated.toSeq ++ fromPrefix
combined.groupBy(_._1).foreach {
case (key, values) =>
require(values.size == 1,
s"Cannot have multiple values for a given $configType key, got key $key with" +
s" values $values")
}
combined.toMap
}

def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,41 +117,7 @@ package object config extends Logging {
private[spark] val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
private[spark] val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."

private[spark] val KUBERNETES_DRIVER_LABELS =
ConfigBuilder("spark.kubernetes.driver.labels")
.doc("Custom labels that will be added to the driver pod. This should be a comma-separated" +
" list of label key-value pairs, where each label is in the format key=value. Note that" +
" Spark also adds its own labels to the driver pod for bookkeeping purposes.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."

private[spark] val KUBERNETES_DRIVER_ANNOTATIONS =
ConfigBuilder("spark.kubernetes.driver.annotations")
.doc("Custom annotations that will be added to the driver pod. This should be a" +
" comma-separated list of annotation key-value pairs, where each annotation is in the" +
" format key=value.")
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_LABELS =
ConfigBuilder("spark.kubernetes.executor.labels")
.doc("Custom labels that will be added to the executor pods. This should be a" +
" comma-separated list of label key-value pairs, where each label is in the format" +
" key=value.")
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_ANNOTATIONS =
ConfigBuilder("spark.kubernetes.executor.annotations")
.doc("Custom annotations that will be added to the executor pods. This should be a" +
" comma-separated list of annotation key-value pairs, where each annotation is in the" +
" format key=value.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
private[spark] val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ private[spark] class DriverConfigurationStepsOrchestrator(
.getOrElse(Array.empty[String]) ++
additionalMainAppPythonFile.toSeq ++
additionalPythonFiles
val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX,
KUBERNETES_DRIVER_LABELS,
"label")
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " +
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ private[spark] class BaseDriverConfigurationStep(
.build()
}
val driverCustomAnnotations = ConfigurationUtils
.combinePrefixedKeyValuePairsWithDeprecatedConf(
.parsePrefixedKeyValuePairs(
submissionSparkConf,
KUBERNETES_DRIVER_ANNOTATION_PREFIX,
KUBERNETES_DRIVER_ANNOTATIONS,
"annotation")
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ private[spark] class ExecutorPodFactoryImpl(
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)

private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_LABEL_PREFIX,
KUBERNETES_EXECUTOR_LABELS,
"executor label")
require(
!executorLabels.contains(SPARK_APP_ID_LABEL),
Expand All @@ -70,10 +69,9 @@ private[spark] class ExecutorPodFactoryImpl(
s" Spark.")

private val executorAnnotations =
ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
KUBERNETES_EXECUTOR_ANNOTATIONS,
"executor annotation")
private val nodeSelector =
ConfigurationUtils.parsePrefixedKeyValuePairs(
Expand Down