diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala new file mode 100644 index 0000000000000..6355afa0a5041 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import java.util.concurrent.atomic.AtomicLong + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} +import org.apache.commons.io.FilenameUtils +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap} +import org.apache.spark.util.Utils + +// Configures executor pods. Construct one of these with a SparkConf to set up properties that are +// common across all executors. Then, pass in dynamic parameters into createExecutorPod. +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + shuffleServiceConfig: Option[ShuffleServiceConfig], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl( + sparkConf: SparkConf, + nodeAffinityExecutorPodModifier: NodeAffinityExecutorPodModifier, + mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], + executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], + executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin]) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = sparkConf.get( + org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + sparkConf, + KUBERNETES_EXECUTOR_LABEL_PREFIX, + KUBERNETES_EXECUTOR_LABELS, + "executor label") + require( + !executorLabels.contains(SPARK_APP_ID_LABEL), + s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( + !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), + s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = + ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + KUBERNETES_EXECUTOR_ANNOTATIONS, + "executor annotation") + private val nodeSelector = + ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf + .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf + .get(KUBERNETES_DRIVER_POD_NAME) + .getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( + org.apache.spark.internal.config.EXECUTOR_MEMORY.key, + org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf + .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + shuffleServiceConfig: Option[ShuffleServiceConfig], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod = { + val name = s"$executorPodNamePrefix-exec-$executorId" + + // hostname must be no longer than 63 characters, so take the last 63 characters of the pod + // name as the hostname. This preserves uniqueness since the end of name contains + // executorId and applicationId + val hostname = name.substring(Math.max(0, name.length - 63)) + val resolvedExecutorLabels = Map( + SPARK_EXECUTOR_ID_LABEL -> executorId, + SPARK_APP_ID_LABEL -> applicationId, + SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ + executorLabels + val executorMemoryQuantity = new QuantityBuilder(false) + .withAmount(s"${executorMemoryMiB}Mi") + .build() + val executorMemoryLimitQuantity = new QuantityBuilder(false) + .withAmount(s"${executorMemoryWithOverhead}Mi") + .build() + val executorCpuQuantity = new QuantityBuilder(false) + .withAmount(executorCores.toString) + .build() + val executorExtraClasspathEnv = executorExtraClasspath.map { cp => + new EnvVarBuilder() + .withName(ENV_EXECUTOR_EXTRA_CLASSPATH) + .withValue(cp) + .build() + } + val executorExtraJavaOptionsEnv = sparkConf + .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) + .map { opts => + val delimitedOpts = Utils.splitCommandString(opts) + delimitedOpts.zipWithIndex.map { + case (opt, index) => + new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() + } + }.getOrElse(Seq.empty[EnvVar]) + val executorEnv = (Seq( + (ENV_EXECUTOR_PORT, executorPort.toString), + (ENV_DRIVER_URL, driverUrl), + // Executor backend expects integral value for executor cores, so round it up to an int. + (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), + (ENV_EXECUTOR_MEMORY, executorMemoryString), + (ENV_APPLICATION_ID, applicationId), + (ENV_EXECUTOR_ID, executorId), + (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs) + .map(env => new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build() + ) ++ Seq( + new EnvVarBuilder() + .withName(ENV_EXECUTOR_POD_IP) + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef("v1", "status.podIP") + .build()) + .build() + ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq + val requiredPorts = Seq( + (EXECUTOR_PORT_NAME, executorPort), + (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) + .map(port => { + new ContainerPortBuilder() + .withName(port._1) + .withContainerPort(port._2) + .build() + }) + + val executorContainer = new ContainerBuilder() + .withName(s"executor") + .withImage(executorDockerImage) + .withImagePullPolicy(dockerImagePullPolicy) + .withNewResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .addToRequests("cpu", executorCpuQuantity) + .endResources() + .addAllToEnv(executorEnv.asJava) + .withPorts(requiredPorts.asJava) + .build() + + val executorPod = new PodBuilder() + .withNewMetadata() + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() + .withNewSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) + .endSpec() + .build() + + val containerWithExecutorLimitCores = executorLimitCores.map { + limitCores => + val executorCpuLimitQuantity = new QuantityBuilder(false) + .withAmount(limitCores) + .build() + new ContainerBuilder(executorContainer) + .editResources() + .addToLimits("cpu", executorCpuLimitQuantity) + .endResources() + .build() + }.getOrElse(executorContainer) + + val withMaybeShuffleConfigExecutorContainer = shuffleServiceConfig.map { config => + config.shuffleDirs.foldLeft(containerWithExecutorLimitCores) { (container, dir) => + new ContainerBuilder(container) + .addNewVolumeMount() + .withName(FilenameUtils.getBaseName(dir)) + .withMountPath(dir) + .endVolumeMount() + .build() + } + }.getOrElse(containerWithExecutorLimitCores) + val withMaybeShuffleConfigPod = shuffleServiceConfig.map { config => + config.shuffleDirs.foldLeft(executorPod) { (builder, dir) => + new PodBuilder(builder) + .editSpec() + .addNewVolume() + .withName(FilenameUtils.getBaseName(dir)) + .withNewHostPath() + .withPath(dir) + .endHostPath() + .endVolume() + .endSpec() + .build() + } + }.getOrElse(executorPod) + val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) = + mountSmallFilesBootstrap.map { bootstrap => + bootstrap.mountSmallFilesSecret( + withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer) + }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)) + val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = + executorInitContainerBootstrap.map { bootstrap => + val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( + PodWithDetachedInitContainer( + withMaybeSmallFilesMountedPod, + new ContainerBuilder().build(), + withMaybeSmallFilesMountedContainer)) + + val resolvedInitContainer = executorMountInitContainerSecretPlugin.map { plugin => + plugin.mountResourceStagingServerSecretIntoInitContainer( + podWithDetachedInitContainer.initContainer) + }.getOrElse(podWithDetachedInitContainer.initContainer) + + val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer( + podWithDetachedInitContainer.pod, resolvedInitContainer) + + val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin => + plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer) + }.getOrElse(podWithAttachedInitContainer) + + (resolvedPodWithMountedSecret, podWithDetachedInitContainer.mainContainer) + }.getOrElse((withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer)) + + val executorPodWithNodeAffinity = + nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( + executorPodWithInitContainer, nodeToLocalTaskCount) + new PodBuilder(executorPodWithNodeAffinity) + .editSpec() + .addToContainers(initBootstrappedExecutorContainer) + .endSpec() + .build() + } +} + +private object ExecutorPodFactoryImpl { + private val DEFAULT_STATIC_PORT = 10000 +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index b89e81bcb0be9..f63d0aeabad3b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -96,12 +96,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sparkConf, Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + val executorPodFactory = new ExecutorPodFactoryImpl( + sparkConf, + NodeAffinityExecutorPodModifierImpl, + mountSmallFilesBootstrap, + executorInitContainerbootStrap, + executorInitContainerSecretVolumePlugin) new KubernetesClusterSchedulerBackend( sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc, - executorInitContainerbootStrap, - executorInitContainerSecretVolumePlugin, - mountSmallFilesBootstrap, + executorPodFactory, kubernetesClient) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index a6f2fdcd3c710..4a4ec4d284aec 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -33,10 +33,9 @@ import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} -import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} @@ -48,9 +47,7 @@ import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, val sc: SparkContext, - executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], - executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], - mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], + executorPodFactory: ExecutorPodFactory, kubernetesClient: KubernetesClient) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { @@ -70,42 +67,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorsToRemove = Collections.newSetFromMap[String]( new ConcurrentHashMap[String, java.lang.Boolean]()).asScala - private val executorExtraClasspath = conf.get( - org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) - private val executorJarsDownloadDir = conf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) - - private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( - conf, - KUBERNETES_EXECUTOR_LABEL_PREFIX, - KUBERNETES_EXECUTOR_LABELS, - "executor label") - require( - !executorLabels.contains(SPARK_APP_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is" + - s" reserved for Spark.") - require( - !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + - s" Spark.") - - private val executorAnnotations = - ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( - conf, - KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, - KUBERNETES_EXECUTOR_ANNOTATIONS, - "executor annotation") - private val nodeSelector = - ConfigurationUtils.parsePrefixedKeyValuePairs( - conf, - KUBERNETES_NODE_SELECTOR_PREFIX, - "node-selector") private var shufflePodCache: Option[ShufflePodCache] = None - private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) - private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) - private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) - private val blockmanagerPort = conf - .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) private val kubernetesDriverPodName = conf .get(KUBERNETES_DRIVER_POD_NAME) @@ -113,22 +76,8 @@ private[spark] class KubernetesClusterSchedulerBackend( throw new SparkException("Must specify the driver pod name")) private val executorPodNamePrefix = conf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) - private val executorMemoryMiB = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) - private val executorMemoryString = conf.get( - org.apache.spark.internal.config.EXECUTOR_MEMORY.key, - org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) - - private val memoryOverheadMiB = conf - .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB)) - private val executorMemoryWithOverheadMiB = executorMemoryMiB + memoryOverheadMiB - - private val executorCores = conf.getDouble("spark.executor.cores", 1d) - private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) - private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) + ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). @@ -180,7 +129,6 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorWatchResource = new AtomicReference[Closeable] protected var totalExpectedExecutors = new AtomicInteger(0) - private val driverUrl = RpcEndpointAddress( sc.getConf.get("spark.driver.host"), sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), @@ -273,8 +221,6 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule) - private def getShuffleClient(): KubernetesExternalShuffleClient = { new KubernetesExternalShuffleClient( SparkTransportConf.fromSparkConf(conf, "shuffle"), @@ -388,37 +334,6 @@ private[spark] class KubernetesClusterSchedulerBackend( nodeToLocalTaskCount.toMap[String, Int] } - private def addNodeAffinityAnnotationIfUseful( - baseExecutorPod: Pod, nodeToTaskCount: Map[String, Int]): Pod = { - def scaleToRange(value: Int, baseMin: Double, baseMax: Double, - rangeMin: Double, rangeMax: Double): Int = - (((rangeMax - rangeMin) * (value - baseMin) / (baseMax - baseMin)) + rangeMin).toInt - - if (nodeToTaskCount.nonEmpty) { - val taskTotal = nodeToTaskCount.foldLeft(0)(_ + _._2) - // Normalize to node affinity weights in 1 to 100 range. - val nodeToWeight = nodeToTaskCount.map{ - case (node, taskCount) => - (node, scaleToRange(taskCount, 1, taskTotal, rangeMin = 1, rangeMax = 100))} - val weightToNodes = nodeToWeight.groupBy(_._2).mapValues(_.keys) - // @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node - val nodeAffinityJson = objectMapper.writeValueAsString(SchedulerAffinity(NodeAffinity( - preferredDuringSchedulingIgnoredDuringExecution = - for ((weight, nodes) <- weightToNodes) yield - WeightedPreference(weight, - Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes)))) - ))) - // TODO: Use non-annotation syntax when we switch to K8s version 1.6. - logDebug(s"Adding nodeAffinity as annotation $nodeAffinityJson") - new PodBuilder(baseExecutorPod).editMetadata() - .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinityJson) - .endMetadata() - .build() - } else { - baseExecutorPod - } - } - /** * Allocates a new executor pod * @@ -429,179 +344,16 @@ private[spark] class KubernetesClusterSchedulerBackend( */ private def allocateNewExecutorPod(nodeToLocalTaskCount: Map[String, Int]): (String, Pod) = { val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString - val name = s"$executorPodNamePrefix-exec-$executorId" - - // hostname must be no longer than 63 characters, so take the last 63 characters of the pod - // name as the hostname. This preserves uniqueness since the end of name contains - // executorId and applicationId - val hostname = name.substring(Math.max(0, name.length - 63)) - val resolvedExecutorLabels = Map( - SPARK_EXECUTOR_ID_LABEL -> executorId, - SPARK_APP_ID_LABEL -> applicationId(), - SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ - executorLabels - val executorMemoryQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryMiB}Mi") - .build() - val executorMemoryLimitQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryWithOverheadMiB}Mi") - .build() - val executorCpuQuantity = new QuantityBuilder(false) - .withAmount(executorCores.toString) - .build() - val executorExtraClasspathEnv = executorExtraClasspath.map { cp => - new EnvVarBuilder() - .withName(ENV_EXECUTOR_EXTRA_CLASSPATH) - .withValue(cp) - .build() - } - val executorExtraJavaOptionsEnv = conf - .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) - .map { opts => - val delimitedOpts = Utils.splitCommandString(opts) - delimitedOpts.zipWithIndex.map { - case (opt, index) => - new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() - } - }.getOrElse(Seq.empty[EnvVar]) - val executorEnv = (Seq( - (ENV_EXECUTOR_PORT, executorPort.toString), - (ENV_DRIVER_URL, driverUrl), - // Executor backend expects integral value for executor cores, so round it up to an int. - (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), - (ENV_EXECUTOR_MEMORY, executorMemoryString), - (ENV_APPLICATION_ID, applicationId()), - (ENV_EXECUTOR_ID, executorId), - (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ sc.executorEnvs.toSeq) - .map(env => new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build() - ) ++ Seq( - new EnvVarBuilder() - .withName(ENV_EXECUTOR_POD_IP) - .withValueFrom(new EnvVarSourceBuilder() - .withNewFieldRef("v1", "status.podIP") - .build()) - .build() - ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq - val requiredPorts = Seq( - (EXECUTOR_PORT_NAME, executorPort), - (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) - .map(port => { - new ContainerPortBuilder() - .withName(port._1) - .withContainerPort(port._2) - .build() - }) - - val executorContainer = new ContainerBuilder() - .withName(s"executor") - .withImage(executorDockerImage) - .withImagePullPolicy(dockerImagePullPolicy) - .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .endResources() - .addAllToEnv(executorEnv.asJava) - .withPorts(requiredPorts.asJava) - .build() - - val executorPod = new PodBuilder() - .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() - .withNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(nodeSelector.asJava) - .endSpec() - .build() - - val containerWithExecutorLimitCores = executorLimitCores.map { - limitCores => - val executorCpuLimitQuantity = new QuantityBuilder(false) - .withAmount(limitCores) - .build() - new ContainerBuilder(executorContainer) - .editResources() - .addToLimits("cpu", executorCpuLimitQuantity) - .endResources() - .build() - }.getOrElse(executorContainer) - - val withMaybeShuffleConfigExecutorContainer = shuffleServiceConfig.map { config => - config.shuffleDirs.foldLeft(containerWithExecutorLimitCores) { (container, dir) => - new ContainerBuilder(container) - .addNewVolumeMount() - .withName(FilenameUtils.getBaseName(dir)) - .withMountPath(dir) - .endVolumeMount() - .build() - } - }.getOrElse(containerWithExecutorLimitCores) - val withMaybeShuffleConfigPod = shuffleServiceConfig.map { config => - config.shuffleDirs.foldLeft(executorPod) { (builder, dir) => - new PodBuilder(builder) - .editSpec() - .addNewVolume() - .withName(FilenameUtils.getBaseName(dir)) - .withNewHostPath() - .withPath(dir) - .endHostPath() - .endVolume() - .endSpec() - .build() - } - }.getOrElse(executorPod) - val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) = - mountSmallFilesBootstrap.map { bootstrap => - bootstrap.mountSmallFilesSecret( - withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer) - }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)) - val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = - executorInitContainerBootstrap.map { bootstrap => - val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( - PodWithDetachedInitContainer( - withMaybeSmallFilesMountedPod, - new ContainerBuilder().build(), - withMaybeSmallFilesMountedContainer)) - - val resolvedInitContainer = executorMountInitContainerSecretPlugin.map { plugin => - plugin.mountResourceStagingServerSecretIntoInitContainer( - podWithDetachedInitContainer.initContainer) - }.getOrElse(podWithDetachedInitContainer.initContainer) - - val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer( - podWithDetachedInitContainer.pod, resolvedInitContainer) - - val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin => - plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer) - }.getOrElse(podWithAttachedInitContainer) - - (resolvedPodWithMountedSecret, podWithDetachedInitContainer.mainContainer) - }.getOrElse((withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer)) - - val executorPodWithNodeAffinity = addNodeAffinityAnnotationIfUseful( - executorPodWithInitContainer, nodeToLocalTaskCount) - val resolvedExecutorPod = new PodBuilder(executorPodWithNodeAffinity) - .editSpec() - .addToContainers(initBootstrappedExecutorContainer) - .endSpec() - .build() + val executorPod = executorPodFactory.createExecutorPod( + executorId, + applicationId(), + driverUrl, + sc.conf.getExecutorEnv, + shuffleServiceConfig, + driverPod, + nodeToLocalTaskCount) try { - (executorId, kubernetesClient.pods.create(resolvedExecutorPod)) + (executorId, kubernetesClient.pods.create(executorPod)) } catch { case throwable: Throwable => logError("Failed to allocate executor pod.", throwable) @@ -783,10 +535,6 @@ private[spark] class KubernetesClusterSchedulerBackend( } } } -case class ShuffleServiceConfig( - shuffleNamespace: String, - shuffleLabels: Map[String, String], - shuffleDirs: Seq[String]) private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 @@ -801,14 +549,3 @@ private object KubernetesClusterSchedulerBackend { } } -/** - * These case classes model K8s node affinity syntax for - * preferredDuringSchedulingIgnoredDuringExecution. - * @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node - */ -case class SchedulerAffinity(nodeAffinity: NodeAffinity) -case class NodeAffinity(preferredDuringSchedulingIgnoredDuringExecution: - Iterable[WeightedPreference]) -case class WeightedPreference(weight: Int, preference: Preference) -case class Preference(matchExpressions: Array[MatchExpression]) -case class MatchExpression(key: String, operator: String, values: Iterable[String]) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala new file mode 100644 index 0000000000000..d73bc6cf93aa2 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/NodeAffinityExecutorPodModifier.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} + +import org.apache.spark.deploy.kubernetes.constants.ANNOTATION_EXECUTOR_NODE_AFFINITY +import org.apache.spark.internal.Logging + +// Applies a node affinity annotation to executor pods so that pods can be placed optimally for +// locality. +private[spark] trait NodeAffinityExecutorPodModifier { + def addNodeAffinityAnnotationIfUseful( + baseExecutorPod: Pod, nodeToTaskCount: Map[String, Int]): Pod +} + +private[spark] object NodeAffinityExecutorPodModifierImpl + extends NodeAffinityExecutorPodModifier with Logging { + + private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule) + + private def scaleToRange( + value: Int, + baseMin: Double, + baseMax: Double, + rangeMin: Double, + rangeMax: Double): Int = { + (((rangeMax - rangeMin) * (value - baseMin) / (baseMax - baseMin)) + rangeMin).toInt + } + override def addNodeAffinityAnnotationIfUseful( + baseExecutorPod: Pod, nodeToTaskCount: Map[String, Int]): Pod = { + if (nodeToTaskCount.nonEmpty) { + val taskTotal = nodeToTaskCount.foldLeft(0)(_ + _._2) + // Normalize to node affinity weights in 1 to 100 range. + val nodeToWeight = nodeToTaskCount.map { + case (node, taskCount) => + (node, scaleToRange(taskCount, 1, taskTotal, rangeMin = 1, rangeMax = 100)) + } + val weightToNodes = nodeToWeight.groupBy(_._2).mapValues(_.keys) + // @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node + val nodeAffinityJson = OBJECT_MAPPER.writeValueAsString(SchedulerAffinity(NodeAffinity( + preferredDuringSchedulingIgnoredDuringExecution = + for ((weight, nodes) <- weightToNodes) yield { + WeightedPreference( + weight, + Preference(Array(MatchExpression("kubernetes.io/hostname", "In", nodes)))) + }))) + // TODO: Use non-annotation syntax when we switch to K8s version 1.6. + logDebug(s"Adding nodeAffinity as annotation $nodeAffinityJson") + new PodBuilder(baseExecutorPod) + .editMetadata() + .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY, nodeAffinityJson) + .endMetadata() + .build() + } else { + baseExecutorPod + } + } +} + +// These case classes model K8s node affinity syntax fo +// preferredDuringSchedulingIgnoredDuringExecution. +// see https://kubernetes.io/docs/concepts/configuration/assign-pod-node +private case class SchedulerAffinity(nodeAffinity: NodeAffinity) +private case class NodeAffinity( + preferredDuringSchedulingIgnoredDuringExecution: Iterable[WeightedPreference]) +private case class WeightedPreference(weight: Int, preference: Preference) +private case class Preference(matchExpressions: Array[MatchExpression]) +private case class MatchExpression(key: String, operator: String, values: Iterable[String]) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShuffleServiceConfig.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShuffleServiceConfig.scala new file mode 100644 index 0000000000000..ca1bbbe17076f --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShuffleServiceConfig.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +private[spark] case class ShuffleServiceConfig( + shuffleNamespace: String, + shuffleLabels: Map[String, String], + shuffleDirs: Seq[String])