From d8885716e75b1f8505006b175edcf44f57170b1e Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 29 Aug 2017 11:30:47 -0700 Subject: [PATCH 1/4] Move executor pod construction to a separate class. This is the first of several measures to make KubernetesClusterSchedulerBackend feasible to test. --- resource-managers/kubernetes/README.md | 7 +- .../kubernetes/ExecutorPodFactory.scala | 294 ++++++++++++++++++ .../kubernetes/KubernetesClusterManager.scala | 10 +- .../KubernetesClusterSchedulerBackend.scala | 287 +---------------- .../NodeAffinityExecutorPodModifier.scala | 84 +++++ .../kubernetes/ShuffleServiceConfig.scala | 22 ++ 6 files changed, 421 insertions(+), 283 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShuffleServiceConfig.scala diff --git a/resource-managers/kubernetes/README.md b/resource-managers/kubernetes/README.md index 31b721d19336..685ff343fa3b 100644 --- a/resource-managers/kubernetes/README.md +++ b/resource-managers/kubernetes/README.md @@ -14,14 +14,11 @@ important matters to keep in mind when developing this feature. # Building Spark with Kubernetes Support -To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile the Kubernetes core implementation module along with its dependencies: +To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile +the Kubernetes core implementation module along with its dependencies: build/mvn compile -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests -If this is the first time you compile the Kubernetes core implementation module, run the following command to install the dependencies and compile: - - build/mvn install -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests - To build a distribution of Spark with Kubernetes support, use the `dev/make-distribution.sh` script, and add the `kubernetes` profile as part of the build arguments. Any other build arguments can be specified as one would expect when building Spark normally. For example, to build Spark against Hadoop 2.7 and Kubernetes: diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala new file mode 100644 index 000000000000..d9e56bc7f237 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import java.util.concurrent.atomic.AtomicLong + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} +import org.apache.commons.io.FilenameUtils +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap} +import org.apache.spark.util.Utils + +// Strictly an extension of KubernetesClusterSchedulerBakcne that is factored out for testing. +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + shuffleServiceConfig: Option[ShuffleServiceConfig], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl( + sparkConf: SparkConf, + nodeAffinityExecutorPodModifier: NodeAffinityExecutorPodModifier, + mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], + executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], + executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin]) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val executorExtraClasspath = sparkConf.get( + org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + sparkConf, + KUBERNETES_EXECUTOR_LABEL_PREFIX, + KUBERNETES_EXECUTOR_LABELS, + "executor label") + require( + !executorLabels.contains(SPARK_APP_ID_LABEL), + s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( + !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), + s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = + ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + KUBERNETES_EXECUTOR_ANNOTATIONS, + "executor annotation") + private val nodeSelector = + ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node-selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf + .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf + .get(KUBERNETES_DRIVER_POD_NAME) + .getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMb = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( + org.apache.spark.internal.config.EXECUTOR_MEMORY.key, + org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMb = sparkConf + .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt, + MEMORY_OVERHEAD_MIN)) + private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + shuffleServiceConfig: Option[ShuffleServiceConfig], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod = { + val name = s"$executorPodNamePrefix-exec-$executorId" + + // hostname must be no longer than 63 characters, so take the last 63 characters of the pod + // name as the hostname. This preserves uniqueness since the end of name contains + // executorId and applicationId + val hostname = name.substring(Math.max(0, name.length - 63)) + val resolvedExecutorLabels = Map( + SPARK_EXECUTOR_ID_LABEL -> executorId, + SPARK_APP_ID_LABEL -> applicationId, + SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ + executorLabels + val executorMemoryQuantity = new QuantityBuilder(false) + .withAmount(s"${executorMemoryMb}M") + .build() + val executorMemoryLimitQuantity = new QuantityBuilder(false) + .withAmount(s"${executorMemoryWithOverhead}M") + .build() + val executorCpuQuantity = new QuantityBuilder(false) + .withAmount(executorCores.toString) + .build() + val executorExtraClasspathEnv = executorExtraClasspath.map { cp => + new EnvVarBuilder() + .withName(ENV_EXECUTOR_EXTRA_CLASSPATH) + .withValue(cp) + .build() + } + val executorExtraJavaOptionsEnv = sparkConf + .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) + .map { opts => + val delimitedOpts = Utils.splitCommandString(opts) + delimitedOpts.zipWithIndex.map { + case (opt, index) => + new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() + } + }.getOrElse(Seq.empty[EnvVar]) + val executorEnv = (Seq( + (ENV_EXECUTOR_PORT, executorPort.toString), + (ENV_DRIVER_URL, driverUrl), + // Executor backend expects integral value for executor cores, so round it up to an int. + (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), + (ENV_EXECUTOR_MEMORY, executorMemoryString), + (ENV_APPLICATION_ID, applicationId), + (ENV_EXECUTOR_ID, executorId), + (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs) + .map(env => new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build() + ) ++ Seq( + new EnvVarBuilder() + .withName(ENV_EXECUTOR_POD_IP) + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef("v1", "status.podIP") + .build()) + .build() + ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq + val requiredPorts = Seq( + (EXECUTOR_PORT_NAME, executorPort), + (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) + .map(port => { + new ContainerPortBuilder() + .withName(port._1) + .withContainerPort(port._2) + .build() + }) + + val executorContainer = new ContainerBuilder() + .withName(s"executor") + .withImage(executorDockerImage) + .withImagePullPolicy(dockerImagePullPolicy) + .withNewResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .addToRequests("cpu", executorCpuQuantity) + .endResources() + .addAllToEnv(executorEnv.asJava) + .withPorts(requiredPorts.asJava) + .build() + + val executorPod = new PodBuilder() + .withNewMetadata() + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() + .withNewSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) + .endSpec() + .build() + + val containerWithExecutorLimitCores = executorLimitCores.map { + limitCores => + val executorCpuLimitQuantity = new QuantityBuilder(false) + .withAmount(limitCores) + .build() + new ContainerBuilder(executorContainer) + .editResources() + .addToLimits("cpu", executorCpuLimitQuantity) + .endResources() + .build() + }.getOrElse(executorContainer) + + val withMaybeShuffleConfigExecutorContainer = shuffleServiceConfig.map { config => + config.shuffleDirs.foldLeft(containerWithExecutorLimitCores) { (container, dir) => + new ContainerBuilder(container) + .addNewVolumeMount() + .withName(FilenameUtils.getBaseName(dir)) + .withMountPath(dir) + .endVolumeMount() + .build() + } + }.getOrElse(containerWithExecutorLimitCores) + val withMaybeShuffleConfigPod = shuffleServiceConfig.map { config => + config.shuffleDirs.foldLeft(executorPod) { (builder, dir) => + new PodBuilder(builder) + .editSpec() + .addNewVolume() + .withName(FilenameUtils.getBaseName(dir)) + .withNewHostPath() + .withPath(dir) + .endHostPath() + .endVolume() + .endSpec() + .build() + } + }.getOrElse(executorPod) + val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) = + mountSmallFilesBootstrap.map { bootstrap => + bootstrap.mountSmallFilesSecret( + withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer) + }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)) + val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = + executorInitContainerBootstrap.map { bootstrap => + val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( + PodWithDetachedInitContainer( + withMaybeSmallFilesMountedPod, + new ContainerBuilder().build(), + withMaybeSmallFilesMountedContainer)) + + val resolvedInitContainer = executorMountInitContainerSecretPlugin.map { plugin => + plugin.mountResourceStagingServerSecretIntoInitContainer( + podWithDetachedInitContainer.initContainer) + }.getOrElse(podWithDetachedInitContainer.initContainer) + + val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer( + podWithDetachedInitContainer.pod, resolvedInitContainer) + + val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin => + plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer) + }.getOrElse(podWithAttachedInitContainer) + + (resolvedPodWithMountedSecret, podWithDetachedInitContainer.mainContainer) + }.getOrElse((withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer)) + + val executorPodWithNodeAffinity = + nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( + executorPodWithInitContainer, nodeToLocalTaskCount) + new PodBuilder(executorPodWithNodeAffinity) + .editSpec() + .addToContainers(initBootstrappedExecutorContainer) + .endSpec() + .build() + } +} + +private object ExecutorPodFactoryImpl { + private val DEFAULT_STATIC_PORT = 10000 +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index b89e81bcb0be..f63d0aeabad3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -96,12 +96,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sparkConf, Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + val executorPodFactory = new ExecutorPodFactoryImpl( + sparkConf, + NodeAffinityExecutorPodModifierImpl, + mountSmallFilesBootstrap, + executorInitContainerbootStrap, + executorInitContainerSecretVolumePlugin) new KubernetesClusterSchedulerBackend( sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc, - executorInitContainerbootStrap, - executorInitContainerSecretVolumePlugin, - mountSmallFilesBootstrap, + executorPodFactory, kubernetesClient) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 357acc6512b5..4a4ec4d284ae 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -33,10 +33,9 @@ import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} -import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} @@ -48,9 +47,7 @@ import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, val sc: SparkContext, - executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], - executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], - mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], + executorPodFactory: ExecutorPodFactory, kubernetesClient: KubernetesClient) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { @@ -70,42 +67,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorsToRemove = Collections.newSetFromMap[String]( new ConcurrentHashMap[String, java.lang.Boolean]()).asScala - private val executorExtraClasspath = conf.get( - org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) - private val executorJarsDownloadDir = conf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) - - private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( - conf, - KUBERNETES_EXECUTOR_LABEL_PREFIX, - KUBERNETES_EXECUTOR_LABELS, - "executor label") - require( - !executorLabels.contains(SPARK_APP_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is" + - s" reserved for Spark.") - require( - !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + - s" Spark.") - - private val executorAnnotations = - ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( - conf, - KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, - KUBERNETES_EXECUTOR_ANNOTATIONS, - "executor annotation") - private val nodeSelector = - ConfigurationUtils.parsePrefixedKeyValuePairs( - conf, - KUBERNETES_NODE_SELECTOR_PREFIX, - "node-selector") private var shufflePodCache: Option[ShufflePodCache] = None - private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) - private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) - private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) - private val blockmanagerPort = conf - .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) private val kubernetesDriverPodName = conf .get(KUBERNETES_DRIVER_POD_NAME) @@ -113,22 +76,8 @@ private[spark] class KubernetesClusterSchedulerBackend( throw new SparkException("Must specify the driver pod name")) private val executorPodNamePrefix = conf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) - private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) - private val executorMemoryString = conf.get( - org.apache.spark.internal.config.EXECUTOR_MEMORY.key, - org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) - - private val memoryOverheadMb = conf - .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt, - MEMORY_OVERHEAD_MIN)) - private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb - - private val executorCores = conf.getDouble("spark.executor.cores", 1d) - private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) - private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) + ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). @@ -180,7 +129,6 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorWatchResource = new AtomicReference[Closeable] protected var totalExpectedExecutors = new AtomicInteger(0) - private val driverUrl = RpcEndpointAddress( sc.getConf.get("spark.driver.host"), sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), @@ -273,8 +221,6 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule) - private def getShuffleClient(): KubernetesExternalShuffleClient = { new KubernetesExternalShuffleClient( SparkTransportConf.fromSparkConf(conf, "shuffle"), @@ -388,37 +334,6 @@ private[spark] class KubernetesClusterSchedulerBackend( nodeToLocalTaskCount.toMap[String, Int] } - private def addNodeAffinityAnnotationIfUseful( - baseExecutorPod: Pod, nodeToTaskCount: Map[String, Int]): Pod = { - def scaleToRange(value: Int, baseMin: Double, baseMax: Double, - rangeMin: Double, rangeMax: Double): Int = - (((rangeMax - rangeMin) * (value - baseMin) / (baseMax - baseMin)) + rangeMin).toInt - - if (nodeToTaskCount.nonEmpty) { - val taskTotal = nodeToTaskCount.foldLeft(0)(_ + _._2) - // Normalize to node affinity weights in 1 to 100 range. - val nodeToWeight = nodeToTaskCount.map{ - case (node, taskCount) => - (node, scaleToRange(taskCount, 1, taskTotal, rangeMin = 1, rangeMax = 100))} - val weightToNodes = nodeToWeight.groupBy(_._2).mapValues(_.keys) - // @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node - val nodeAffinityJson = objectMapper.writeValueAsString(SchedulerAffinity(NodeAffinity( - preferredDuringSchedulingIgnoredDuringExecution = - for ((weight, nodes) <- weightToNodes) yield - WeightedPreference(weight, - Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes)))) - ))) - // TODO: Use non-annotation syntax when we switch to K8s version 1.6. - logDebug(s"Adding nodeAffinity as annotation $nodeAffinityJson") - new PodBuilder(baseExecutorPod).editMetadata() - .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinityJson) - .endMetadata() - .build() - } else { - baseExecutorPod - } - } - /** * Allocates a new executor pod * @@ -429,179 +344,16 @@ private[spark] class KubernetesClusterSchedulerBackend( */ private def allocateNewExecutorPod(nodeToLocalTaskCount: Map[String, Int]): (String, Pod) = { val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString - val name = s"$executorPodNamePrefix-exec-$executorId" - - // hostname must be no longer than 63 characters, so take the last 63 characters of the pod - // name as the hostname. This preserves uniqueness since the end of name contains - // executorId and applicationId - val hostname = name.substring(Math.max(0, name.length - 63)) - val resolvedExecutorLabels = Map( - SPARK_EXECUTOR_ID_LABEL -> executorId, - SPARK_APP_ID_LABEL -> applicationId(), - SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ - executorLabels - val executorMemoryQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryMb}M") - .build() - val executorMemoryLimitQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryWithOverhead}M") - .build() - val executorCpuQuantity = new QuantityBuilder(false) - .withAmount(executorCores.toString) - .build() - val executorExtraClasspathEnv = executorExtraClasspath.map { cp => - new EnvVarBuilder() - .withName(ENV_EXECUTOR_EXTRA_CLASSPATH) - .withValue(cp) - .build() - } - val executorExtraJavaOptionsEnv = conf - .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) - .map { opts => - val delimitedOpts = Utils.splitCommandString(opts) - delimitedOpts.zipWithIndex.map { - case (opt, index) => - new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() - } - }.getOrElse(Seq.empty[EnvVar]) - val executorEnv = (Seq( - (ENV_EXECUTOR_PORT, executorPort.toString), - (ENV_DRIVER_URL, driverUrl), - // Executor backend expects integral value for executor cores, so round it up to an int. - (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), - (ENV_EXECUTOR_MEMORY, executorMemoryString), - (ENV_APPLICATION_ID, applicationId()), - (ENV_EXECUTOR_ID, executorId), - (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ sc.executorEnvs.toSeq) - .map(env => new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build() - ) ++ Seq( - new EnvVarBuilder() - .withName(ENV_EXECUTOR_POD_IP) - .withValueFrom(new EnvVarSourceBuilder() - .withNewFieldRef("v1", "status.podIP") - .build()) - .build() - ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq - val requiredPorts = Seq( - (EXECUTOR_PORT_NAME, executorPort), - (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) - .map(port => { - new ContainerPortBuilder() - .withName(port._1) - .withContainerPort(port._2) - .build() - }) - - val executorContainer = new ContainerBuilder() - .withName(s"executor") - .withImage(executorDockerImage) - .withImagePullPolicy(dockerImagePullPolicy) - .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .endResources() - .addAllToEnv(executorEnv.asJava) - .withPorts(requiredPorts.asJava) - .build() - - val executorPod = new PodBuilder() - .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() - .withNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(nodeSelector.asJava) - .endSpec() - .build() - - val containerWithExecutorLimitCores = executorLimitCores.map { - limitCores => - val executorCpuLimitQuantity = new QuantityBuilder(false) - .withAmount(limitCores) - .build() - new ContainerBuilder(executorContainer) - .editResources() - .addToLimits("cpu", executorCpuLimitQuantity) - .endResources() - .build() - }.getOrElse(executorContainer) - - val withMaybeShuffleConfigExecutorContainer = shuffleServiceConfig.map { config => - config.shuffleDirs.foldLeft(containerWithExecutorLimitCores) { (container, dir) => - new ContainerBuilder(container) - .addNewVolumeMount() - .withName(FilenameUtils.getBaseName(dir)) - .withMountPath(dir) - .endVolumeMount() - .build() - } - }.getOrElse(containerWithExecutorLimitCores) - val withMaybeShuffleConfigPod = shuffleServiceConfig.map { config => - config.shuffleDirs.foldLeft(executorPod) { (builder, dir) => - new PodBuilder(builder) - .editSpec() - .addNewVolume() - .withName(FilenameUtils.getBaseName(dir)) - .withNewHostPath() - .withPath(dir) - .endHostPath() - .endVolume() - .endSpec() - .build() - } - }.getOrElse(executorPod) - val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) = - mountSmallFilesBootstrap.map { bootstrap => - bootstrap.mountSmallFilesSecret( - withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer) - }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)) - val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = - executorInitContainerBootstrap.map { bootstrap => - val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( - PodWithDetachedInitContainer( - withMaybeSmallFilesMountedPod, - new ContainerBuilder().build(), - withMaybeSmallFilesMountedContainer)) - - val resolvedInitContainer = executorMountInitContainerSecretPlugin.map { plugin => - plugin.mountResourceStagingServerSecretIntoInitContainer( - podWithDetachedInitContainer.initContainer) - }.getOrElse(podWithDetachedInitContainer.initContainer) - - val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer( - podWithDetachedInitContainer.pod, resolvedInitContainer) - - val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin => - plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer) - }.getOrElse(podWithAttachedInitContainer) - - (resolvedPodWithMountedSecret, podWithDetachedInitContainer.mainContainer) - }.getOrElse((withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer)) - - val executorPodWithNodeAffinity = addNodeAffinityAnnotationIfUseful( - executorPodWithInitContainer, nodeToLocalTaskCount) - val resolvedExecutorPod = new PodBuilder(executorPodWithNodeAffinity) - .editSpec() - .addToContainers(initBootstrappedExecutorContainer) - .endSpec() - .build() + val executorPod = executorPodFactory.createExecutorPod( + executorId, + applicationId(), + driverUrl, + sc.conf.getExecutorEnv, + shuffleServiceConfig, + driverPod, + nodeToLocalTaskCount) try { - (executorId, kubernetesClient.pods.create(resolvedExecutorPod)) + (executorId, kubernetesClient.pods.create(executorPod)) } catch { case throwable: Throwable => logError("Failed to allocate executor pod.", throwable) @@ -783,10 +535,6 @@ private[spark] class KubernetesClusterSchedulerBackend( } } } -case class ShuffleServiceConfig( - shuffleNamespace: String, - shuffleLabels: Map[String, String], - shuffleDirs: Seq[String]) private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 @@ -801,14 +549,3 @@ private object KubernetesClusterSchedulerBackend { } } -/** - * These case classes model K8s node affinity syntax for - * preferredDuringSchedulingIgnoredDuringExecution. - * @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node - */ -case class SchedulerAffinity(nodeAffinity: NodeAffinity) -case class NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution: - Iterable[WeightedPreference]) -case class WeightedPreference(weight: Int, preference: Preference) -case class Preference(matchExpressions: Array[MatchExpression]) -case class MatchExpression(key: String, operator: String, values: Iterable[String]) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala new file mode 100644 index 000000000000..126cfdeb5cbd --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} + +import org.apache.spark.deploy.kubernetes.constants.ANNOTATION_EXECUTOR_NODE_AFFINITY +import org.apache.spark.internal.Logging + +// Strictly an extension of ExecutorPodFactory but extracted out for testing. +private[spark] trait NodeAffinityExecutorPodModifier { + def addNodeAffinityAnnotationIfUseful( + baseExecutorPod: Pod, nodeToTaskCount: Map[String, Int]): Pod +} + +private[spark] object NodeAffinityExecutorPodModifierImpl + extends NodeAffinityExecutorPodModifier with Logging { + + private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule) + + private def scaleToRange( + value: Int, + baseMin: Double, + baseMax: Double, + rangeMin: Double, + rangeMax: Double): Int = { + (((rangeMax - rangeMin) * (value - baseMin) / (baseMax - baseMin)) + rangeMin).toInt + } + override def addNodeAffinityAnnotationIfUseful( + baseExecutorPod: Pod, nodeToTaskCount: Map[String, Int]): Pod = { + if (nodeToTaskCount.nonEmpty) { + val taskTotal = nodeToTaskCount.foldLeft(0)(_ + _._2) + // Normalize to node affinity weights in 1 to 100 range. + val nodeToWeight = nodeToTaskCount.map { + case (node, taskCount) => + (node, scaleToRange(taskCount, 1, taskTotal, rangeMin = 1, rangeMax = 100)) + } + val weightToNodes = nodeToWeight.groupBy(_._2).mapValues(_.keys) + // @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node + val nodeAffinityJson = OBJECT_MAPPER.writeValueAsString(SchedulerAffinity(NodeAffinity( + preferredDuringSchedulingIgnoredDuringExecution = + for ((weight, nodes) <- weightToNodes) yield { + WeightedPreference( + weight, + Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes)))) + }))) + // TODO: Use non-annotation syntax when we switch to K8s version 1.6. + logDebug(s"Adding nodeAffinity as annotation $nodeAffinityJson") + new PodBuilder(baseExecutorPod) + .editMetadata() + .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinityJson) + .endMetadata() + .build() + } else { + baseExecutorPod + } + } +} + +// These case classes model K8s node affinity syntax fo +// preferredDuringSchedulingIgnoredDuringExecution. +// see https://kubernetes.io/docs/concepts/configuration/assign-pod-node +private case class SchedulerAffinity(nodeAffinity: NodeAffinity) +private case class NodeAffinity( + preferredDuringSchedulingIgnoredDuringExecution: Iterable[WeightedPreference]) +private case class WeightedPreference(weight: Int, preference: Preference) +private case class Preference(matchExpressions: Array[MatchExpression]) +private case class MatchExpression(key: String, operator: String, values: Iterable[String]) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShuffleServiceConfig.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShuffleServiceConfig.scala new file mode 100644 index 000000000000..ca1bbbe17076 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShuffleServiceConfig.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +private[spark] case class ShuffleServiceConfig( + shuffleNamespace: String, + shuffleLabels: Map[String, String], + shuffleDirs: Seq[String]) From dc6b186afdb224979c0a62d2e1f9b190b10de9fd Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 29 Aug 2017 11:38:10 -0700 Subject: [PATCH 2/4] Revert change to README --- resource-managers/kubernetes/README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/README.md b/resource-managers/kubernetes/README.md index 685ff343fa3b..31b721d19336 100644 --- a/resource-managers/kubernetes/README.md +++ b/resource-managers/kubernetes/README.md @@ -14,11 +14,14 @@ important matters to keep in mind when developing this feature. # Building Spark with Kubernetes Support -To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile -the Kubernetes core implementation module along with its dependencies: +To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile the Kubernetes core implementation module along with its dependencies: build/mvn compile -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests +If this is the first time you compile the Kubernetes core implementation module, run the following command to install the dependencies and compile: + + build/mvn install -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests + To build a distribution of Spark with Kubernetes support, use the `dev/make-distribution.sh` script, and add the `kubernetes` profile as part of the build arguments. Any other build arguments can be specified as one would expect when building Spark normally. For example, to build Spark against Hadoop 2.7 and Kubernetes: From 3cdba3b87b95360f5029006c05b9014611bb85a2 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 29 Aug 2017 17:23:21 -0700 Subject: [PATCH 3/4] Address comments. --- .../scheduler/cluster/kubernetes/ExecutorPodFactory.scala | 6 +++--- .../kubernetes/NodeAffinityExecutorPodModifier.scala | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala index d9e56bc7f237..0a6c0939b540 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala @@ -29,7 +29,8 @@ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap} import org.apache.spark.util.Utils -// Strictly an extension of KubernetesClusterSchedulerBakcne that is factored out for testing. +// Configures executor pods. Construct one of these with a SparkConf to set up properties that are +// common across all executors. Then, pass in dynamic parameters into createExecutorPod. private[spark] trait ExecutorPodFactory { def createExecutorPod( executorId: String, @@ -51,7 +52,6 @@ private[spark] class ExecutorPodFactoryImpl( import ExecutorPodFactoryImpl._ - private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) private val executorExtraClasspath = sparkConf.get( org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) @@ -79,7 +79,7 @@ private[spark] class ExecutorPodFactoryImpl( ConfigurationUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, - "node-selector") + "node selector") private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala index 126cfdeb5cbd..d73bc6cf93aa 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala @@ -23,7 +23,8 @@ import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} import org.apache.spark.deploy.kubernetes.constants.ANNOTATION_EXECUTOR_NODE_AFFINITY import org.apache.spark.internal.Logging -// Strictly an extension of ExecutorPodFactory but extracted out for testing. +// Applies a node affinity annotation to executor pods so that pods can be placed optimally for +// locality. private[spark] trait NodeAffinityExecutorPodModifier { def addNodeAffinityAnnotationIfUseful( baseExecutorPod: Pod, nodeToTaskCount: Map[String, Int]): Pod From 07cfca2aa74669250d0a18a43fd860e0a6a2c885 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 30 Aug 2017 15:06:37 -0700 Subject: [PATCH 4/4] Resolve merge conflicts. Move MiB change to ExecutorPodFactory. --- .../kubernetes/ExecutorPodFactory.scala | 14 +- .../KubernetesClusterSchedulerBackend.scala | 378 ------------------ 2 files changed, 7 insertions(+), 385 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala index 0a6c0939b540..6355afa0a504 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala @@ -92,16 +92,16 @@ private[spark] class ExecutorPodFactoryImpl( private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) - private val executorMemoryMb = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) private val executorMemoryString = sparkConf.get( org.apache.spark.internal.config.EXECUTOR_MEMORY.key, org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) - private val memoryOverheadMb = sparkConf + private val memoryOverheadMiB = sparkConf .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt, - MEMORY_OVERHEAD_MIN)) - private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) @@ -126,10 +126,10 @@ private[spark] class ExecutorPodFactoryImpl( SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ executorLabels val executorMemoryQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryMb}M") + .withAmount(s"${executorMemoryMiB}Mi") .build() val executorMemoryLimitQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryWithOverhead}M") + .withAmount(s"${executorMemoryWithOverhead}Mi") .build() val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCores.toString) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 0c954e922cee..4a4ec4d284ae 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -76,38 +76,6 @@ private[spark] class KubernetesClusterSchedulerBackend( throw new SparkException("Must specify the driver pod name")) private val executorPodNamePrefix = conf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) -<<<<<<< HEAD -||||||| merged common ancestors - private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) - private val executorMemoryString = conf.get( - org.apache.spark.internal.config.EXECUTOR_MEMORY.key, - org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) - - private val memoryOverheadMb = conf - .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt, - MEMORY_OVERHEAD_MIN)) - private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb - - private val executorCores = conf.getDouble("spark.executor.cores", 1d) - private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) - -======= - private val executorMemoryMiB = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) - private val executorMemoryString = conf.get( - org.apache.spark.internal.config.EXECUTOR_MEMORY.key, - org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) - - private val memoryOverheadMiB = conf - .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB)) - private val executorMemoryWithOverheadMiB = executorMemoryMiB + memoryOverheadMiB - - private val executorCores = conf.getDouble("spark.executor.cores", 1d) - private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) - ->>>>>>> origin/branch-2.2-kubernetes private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) @@ -376,7 +344,6 @@ private[spark] class KubernetesClusterSchedulerBackend( */ private def allocateNewExecutorPod(nodeToLocalTaskCount: Map[String, Int]): (String, Pod) = { val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString -<<<<<<< HEAD val executorPod = executorPodFactory.createExecutorPod( executorId, applicationId(), @@ -385,351 +352,6 @@ private[spark] class KubernetesClusterSchedulerBackend( shuffleServiceConfig, driverPod, nodeToLocalTaskCount) -||||||| merged common ancestors - val name = s"$executorPodNamePrefix-exec-$executorId" - - // hostname must be no longer than 63 characters, so take the last 63 characters of the pod - // name as the hostname. This preserves uniqueness since the end of name contains - // executorId and applicationId - val hostname = name.substring(Math.max(0, name.length - 63)) - val resolvedExecutorLabels = Map( - SPARK_EXECUTOR_ID_LABEL -> executorId, - SPARK_APP_ID_LABEL -> applicationId(), - SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ - executorLabels - val executorMemoryQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryMb}M") - .build() - val executorMemoryLimitQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryWithOverhead}M") - .build() - val executorCpuQuantity = new QuantityBuilder(false) - .withAmount(executorCores.toString) - .build() - val executorExtraClasspathEnv = executorExtraClasspath.map { cp => - new EnvVarBuilder() - .withName(ENV_EXECUTOR_EXTRA_CLASSPATH) - .withValue(cp) - .build() - } - val executorExtraJavaOptionsEnv = conf - .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) - .map { opts => - val delimitedOpts = Utils.splitCommandString(opts) - delimitedOpts.zipWithIndex.map { - case (opt, index) => - new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() - } - }.getOrElse(Seq.empty[EnvVar]) - val executorEnv = (Seq( - (ENV_EXECUTOR_PORT, executorPort.toString), - (ENV_DRIVER_URL, driverUrl), - // Executor backend expects integral value for executor cores, so round it up to an int. - (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), - (ENV_EXECUTOR_MEMORY, executorMemoryString), - (ENV_APPLICATION_ID, applicationId()), - (ENV_EXECUTOR_ID, executorId), - (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ sc.executorEnvs.toSeq) - .map(env => new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build() - ) ++ Seq( - new EnvVarBuilder() - .withName(ENV_EXECUTOR_POD_IP) - .withValueFrom(new EnvVarSourceBuilder() - .withNewFieldRef("v1", "status.podIP") - .build()) - .build() - ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq - val requiredPorts = Seq( - (EXECUTOR_PORT_NAME, executorPort), - (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) - .map(port => { - new ContainerPortBuilder() - .withName(port._1) - .withContainerPort(port._2) - .build() - }) - - val executorContainer = new ContainerBuilder() - .withName(s"executor") - .withImage(executorDockerImage) - .withImagePullPolicy(dockerImagePullPolicy) - .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .endResources() - .addAllToEnv(executorEnv.asJava) - .withPorts(requiredPorts.asJava) - .build() - - val executorPod = new PodBuilder() - .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() - .withNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(nodeSelector.asJava) - .endSpec() - .build() - - val containerWithExecutorLimitCores = executorLimitCores.map { - limitCores => - val executorCpuLimitQuantity = new QuantityBuilder(false) - .withAmount(limitCores) - .build() - new ContainerBuilder(executorContainer) - .editResources() - .addToLimits("cpu", executorCpuLimitQuantity) - .endResources() - .build() - }.getOrElse(executorContainer) - - val withMaybeShuffleConfigExecutorContainer = shuffleServiceConfig.map { config => - config.shuffleDirs.foldLeft(containerWithExecutorLimitCores) { (container, dir) => - new ContainerBuilder(container) - .addNewVolumeMount() - .withName(FilenameUtils.getBaseName(dir)) - .withMountPath(dir) - .endVolumeMount() - .build() - } - }.getOrElse(containerWithExecutorLimitCores) - val withMaybeShuffleConfigPod = shuffleServiceConfig.map { config => - config.shuffleDirs.foldLeft(executorPod) { (builder, dir) => - new PodBuilder(builder) - .editSpec() - .addNewVolume() - .withName(FilenameUtils.getBaseName(dir)) - .withNewHostPath() - .withPath(dir) - .endHostPath() - .endVolume() - .endSpec() - .build() - } - }.getOrElse(executorPod) - val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) = - mountSmallFilesBootstrap.map { bootstrap => - bootstrap.mountSmallFilesSecret( - withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer) - }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)) - val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = - executorInitContainerBootstrap.map { bootstrap => - val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( - PodWithDetachedInitContainer( - withMaybeSmallFilesMountedPod, - new ContainerBuilder().build(), - withMaybeSmallFilesMountedContainer)) - - val resolvedInitContainer = executorMountInitContainerSecretPlugin.map { plugin => - plugin.mountResourceStagingServerSecretIntoInitContainer( - podWithDetachedInitContainer.initContainer) - }.getOrElse(podWithDetachedInitContainer.initContainer) - - val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer( - podWithDetachedInitContainer.pod, resolvedInitContainer) - - val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin => - plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer) - }.getOrElse(podWithAttachedInitContainer) - - (resolvedPodWithMountedSecret, podWithDetachedInitContainer.mainContainer) - }.getOrElse((withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer)) - - val executorPodWithNodeAffinity = addNodeAffinityAnnotationIfUseful( - executorPodWithInitContainer, nodeToLocalTaskCount) - val resolvedExecutorPod = new PodBuilder(executorPodWithNodeAffinity) - .editSpec() - .addToContainers(initBootstrappedExecutorContainer) - .endSpec() - .build() -======= - val name = s"$executorPodNamePrefix-exec-$executorId" - - // hostname must be no longer than 63 characters, so take the last 63 characters of the pod - // name as the hostname. This preserves uniqueness since the end of name contains - // executorId and applicationId - val hostname = name.substring(Math.max(0, name.length - 63)) - val resolvedExecutorLabels = Map( - SPARK_EXECUTOR_ID_LABEL -> executorId, - SPARK_APP_ID_LABEL -> applicationId(), - SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ - executorLabels - val executorMemoryQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryMiB}Mi") - .build() - val executorMemoryLimitQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryWithOverheadMiB}Mi") - .build() - val executorCpuQuantity = new QuantityBuilder(false) - .withAmount(executorCores.toString) - .build() - val executorExtraClasspathEnv = executorExtraClasspath.map { cp => - new EnvVarBuilder() - .withName(ENV_EXECUTOR_EXTRA_CLASSPATH) - .withValue(cp) - .build() - } - val executorExtraJavaOptionsEnv = conf - .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) - .map { opts => - val delimitedOpts = Utils.splitCommandString(opts) - delimitedOpts.zipWithIndex.map { - case (opt, index) => - new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() - } - }.getOrElse(Seq.empty[EnvVar]) - val executorEnv = (Seq( - (ENV_EXECUTOR_PORT, executorPort.toString), - (ENV_DRIVER_URL, driverUrl), - // Executor backend expects integral value for executor cores, so round it up to an int. - (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), - (ENV_EXECUTOR_MEMORY, executorMemoryString), - (ENV_APPLICATION_ID, applicationId()), - (ENV_EXECUTOR_ID, executorId), - (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ sc.executorEnvs.toSeq) - .map(env => new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build() - ) ++ Seq( - new EnvVarBuilder() - .withName(ENV_EXECUTOR_POD_IP) - .withValueFrom(new EnvVarSourceBuilder() - .withNewFieldRef("v1", "status.podIP") - .build()) - .build() - ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq - val requiredPorts = Seq( - (EXECUTOR_PORT_NAME, executorPort), - (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) - .map(port => { - new ContainerPortBuilder() - .withName(port._1) - .withContainerPort(port._2) - .build() - }) - - val executorContainer = new ContainerBuilder() - .withName(s"executor") - .withImage(executorDockerImage) - .withImagePullPolicy(dockerImagePullPolicy) - .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .endResources() - .addAllToEnv(executorEnv.asJava) - .withPorts(requiredPorts.asJava) - .build() - - val executorPod = new PodBuilder() - .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() - .withNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(nodeSelector.asJava) - .endSpec() - .build() - - val containerWithExecutorLimitCores = executorLimitCores.map { - limitCores => - val executorCpuLimitQuantity = new QuantityBuilder(false) - .withAmount(limitCores) - .build() - new ContainerBuilder(executorContainer) - .editResources() - .addToLimits("cpu", executorCpuLimitQuantity) - .endResources() - .build() - }.getOrElse(executorContainer) - - val withMaybeShuffleConfigExecutorContainer = shuffleServiceConfig.map { config => - config.shuffleDirs.foldLeft(containerWithExecutorLimitCores) { (container, dir) => - new ContainerBuilder(container) - .addNewVolumeMount() - .withName(FilenameUtils.getBaseName(dir)) - .withMountPath(dir) - .endVolumeMount() - .build() - } - }.getOrElse(containerWithExecutorLimitCores) - val withMaybeShuffleConfigPod = shuffleServiceConfig.map { config => - config.shuffleDirs.foldLeft(executorPod) { (builder, dir) => - new PodBuilder(builder) - .editSpec() - .addNewVolume() - .withName(FilenameUtils.getBaseName(dir)) - .withNewHostPath() - .withPath(dir) - .endHostPath() - .endVolume() - .endSpec() - .build() - } - }.getOrElse(executorPod) - val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) = - mountSmallFilesBootstrap.map { bootstrap => - bootstrap.mountSmallFilesSecret( - withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer) - }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)) - val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = - executorInitContainerBootstrap.map { bootstrap => - val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( - PodWithDetachedInitContainer( - withMaybeSmallFilesMountedPod, - new ContainerBuilder().build(), - withMaybeSmallFilesMountedContainer)) - - val resolvedInitContainer = executorMountInitContainerSecretPlugin.map { plugin => - plugin.mountResourceStagingServerSecretIntoInitContainer( - podWithDetachedInitContainer.initContainer) - }.getOrElse(podWithDetachedInitContainer.initContainer) - - val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer( - podWithDetachedInitContainer.pod, resolvedInitContainer) - - val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin => - plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer) - }.getOrElse(podWithAttachedInitContainer) - - (resolvedPodWithMountedSecret, podWithDetachedInitContainer.mainContainer) - }.getOrElse((withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer)) - - val executorPodWithNodeAffinity = addNodeAffinityAnnotationIfUseful( - executorPodWithInitContainer, nodeToLocalTaskCount) - val resolvedExecutorPod = new PodBuilder(executorPodWithNodeAffinity) - .editSpec() - .addToContainers(initBootstrappedExecutorContainer) - .endSpec() - .build() ->>>>>>> origin/branch-2.2-kubernetes try { (executorId, kubernetesClient.pods.create(executorPod)) } catch {