From 82bcee60ae2209d4338006ec2dea4332ccd484d3 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 22 Aug 2017 17:26:53 -0700 Subject: [PATCH 1/7] Extract more of the shuffle management to a different class. More efforts to reduce the complexity of the KubernetesClusterSchedulerBackend. The scheduler backend should not be concerned about anything other than the coordination of the executor lifecycle. --- .../KubernetesExternalShuffleClient.java | 60 +------ .../KubernetesExternalShuffleClientImpl.java | 81 +++++++++ .../kubernetes/ExecutorPodFactory.scala | 39 +---- .../kubernetes/KubernetesClusterManager.scala | 19 ++- .../KubernetesClusterSchedulerBackend.scala | 82 ++------- .../KubernetesExternalShuffleManager.scala | 155 ++++++++++++++++++ .../cluster/kubernetes/ShufflePodCache.scala | 93 ----------- ...ubernetesExternalShuffleServiceSuite.scala | 4 +- 8 files changed, 283 insertions(+), 250 deletions(-) create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java index f50b0d3ecb00..38ba59fdce77 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java @@ -14,66 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.network.shuffle.kubernetes; -import org.apache.spark.network.client.RpcResponseCallback; -import org.apache.spark.network.client.TransportClient; -import org.apache.spark.network.sasl.SecretKeyHolder; -import org.apache.spark.network.shuffle.ExternalShuffleClient; -import org.apache.spark.network.shuffle.protocol.RegisterDriver; -import org.apache.spark.network.util.TransportConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.io.Closeable; import java.io.IOException; -import java.nio.ByteBuffer; - -/** - * A client for talking to the external shuffle service in Kubernetes cluster mode. - * - * This is used by the each Spark executor to register with a corresponding external - * shuffle service on the cluster. The purpose is for cleaning up shuffle files - * reliably if the application exits unexpectedly. - */ -public class KubernetesExternalShuffleClient extends ExternalShuffleClient { - private static final Logger logger = LoggerFactory - .getLogger(KubernetesExternalShuffleClient.class); - - /** - * Creates an Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}. - * Please refer to docs on {@link ExternalShuffleClient} for more information. - */ - public KubernetesExternalShuffleClient( - TransportConf conf, - SecretKeyHolder secretKeyHolder, - boolean saslEnabled) { - super(conf, secretKeyHolder, saslEnabled); - } - - public void registerDriverWithShuffleService(String host, int port) - throws IOException, InterruptedException { - checkInit(); - ByteBuffer registerDriver = new RegisterDriver(appId, 0).toByteBuffer(); - TransportClient client = clientFactory.createClient(host, port); - client.sendRpc(registerDriver, new RegisterDriverCallback()); - } - private class RegisterDriverCallback implements RpcResponseCallback { - @Override - public void onSuccess(ByteBuffer response) { - logger.info("Successfully registered app " + appId + " with external shuffle service."); - } +public interface KubernetesExternalShuffleClient extends Closeable { - @Override - public void onFailure(Throwable e) { - logger.warn("Unable to register app " + appId + " with external shuffle service. " + - "Please manually remove shuffle data after driver exit. Error: " + e); - } - } + void init(String appId); - @Override - public void close() { - super.close(); - } + void registerDriverWithShuffleService(String host, int port) throws IOException, InterruptedException; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java new file mode 100644 index 000000000000..f7f6d491b484 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.kubernetes; + +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.sasl.SecretKeyHolder; +import org.apache.spark.network.shuffle.ExternalShuffleClient; +import org.apache.spark.network.shuffle.protocol.RegisterDriver; +import org.apache.spark.network.util.TransportConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A client for talking to the external shuffle service in Kubernetes cluster mode. + * + * This is used by the each Spark executor to register with a corresponding external + * shuffle service on the cluster. The purpose is for cleaning up shuffle files + * reliably if the application exits unexpectedly. + */ +public class KubernetesExternalShuffleClientImpl + extends ExternalShuffleClient implements KubernetesExternalShuffleClient { + + private static final Logger logger = LoggerFactory + .getLogger(KubernetesExternalShuffleClientImpl.class); + + /** + * Creates an Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}. + * Please refer to docs on {@link ExternalShuffleClient} for more information. + */ + public KubernetesExternalShuffleClientImpl( + TransportConf conf, + SecretKeyHolder secretKeyHolder, + boolean saslEnabled) { + super(conf, secretKeyHolder, saslEnabled); + } + + public void registerDriverWithShuffleService(String host, int port) + throws IOException, InterruptedException { + checkInit(); + ByteBuffer registerDriver = new RegisterDriver(appId, 0).toByteBuffer(); + TransportClient client = clientFactory.createClient(host, port); + client.sendRpc(registerDriver, new RegisterDriverCallback()); + } + + private class RegisterDriverCallback implements RpcResponseCallback { + @Override + public void onSuccess(ByteBuffer response) { + logger.info("Successfully registered app " + appId + " with external shuffle service."); + } + + @Override + public void onFailure(Throwable e) { + logger.warn("Unable to register app " + appId + " with external shuffle service. " + + "Please manually remove shuffle data after driver exit. Error: " + e); + } + } + + @Override + public void close() { + super.close(); + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala index 6355afa0a504..6c6bccedaa26 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala @@ -37,7 +37,6 @@ private[spark] trait ExecutorPodFactory { applicationId: String, driverUrl: String, executorEnvs: Seq[(String, String)], - shuffleServiceConfig: Option[ShuffleServiceConfig], driverPod: Pod, nodeToLocalTaskCount: Map[String, Int]): Pod } @@ -47,7 +46,8 @@ private[spark] class ExecutorPodFactoryImpl( nodeAffinityExecutorPodModifier: NodeAffinityExecutorPodModifier, mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], - executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin]) + executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], + shuffleManager: Option[KubernetesExternalShuffleManager]) extends ExecutorPodFactory { import ExecutorPodFactoryImpl._ @@ -111,7 +111,6 @@ private[spark] class ExecutorPodFactoryImpl( applicationId: String, driverUrl: String, executorEnvs: Seq[(String, String)], - shuffleServiceConfig: Option[ShuffleServiceConfig], driverPod: Pod, nodeToLocalTaskCount: Map[String, Int]): Pod = { val name = s"$executorPodNamePrefix-exec-$executorId" @@ -179,6 +178,9 @@ private[spark] class ExecutorPodFactoryImpl( .withContainerPort(port._2) .build() }) + val shuffleVolumesWithMounts = + shuffleManager.map(_.getExecutorShuffleDirVolumesWithMounts) + .getOrElse(Seq.empty) val executorContainer = new ContainerBuilder() .withName(s"executor") @@ -191,6 +193,7 @@ private[spark] class ExecutorPodFactoryImpl( .endResources() .addAllToEnv(executorEnv.asJava) .withPorts(requiredPorts.asJava) + .addAllToVolumeMounts(shuffleVolumesWithMounts.map(_._2).asJava) .build() val executorPod = new PodBuilder() @@ -211,6 +214,7 @@ private[spark] class ExecutorPodFactoryImpl( .withHostname(hostname) .withRestartPolicy("Never") .withNodeSelector(nodeSelector.asJava) + .addAllToVolumes(shuffleVolumesWithMounts.map(_._1).asJava) .endSpec() .build() @@ -225,36 +229,11 @@ private[spark] class ExecutorPodFactoryImpl( .endResources() .build() }.getOrElse(executorContainer) - - val withMaybeShuffleConfigExecutorContainer = shuffleServiceConfig.map { config => - config.shuffleDirs.foldLeft(containerWithExecutorLimitCores) { (container, dir) => - new ContainerBuilder(container) - .addNewVolumeMount() - .withName(FilenameUtils.getBaseName(dir)) - .withMountPath(dir) - .endVolumeMount() - .build() - } - }.getOrElse(containerWithExecutorLimitCores) - val withMaybeShuffleConfigPod = shuffleServiceConfig.map { config => - config.shuffleDirs.foldLeft(executorPod) { (builder, dir) => - new PodBuilder(builder) - .editSpec() - .addNewVolume() - .withName(FilenameUtils.getBaseName(dir)) - .withNewHostPath() - .withPath(dir) - .endHostPath() - .endVolume() - .endSpec() - .build() - } - }.getOrElse(executorPod) val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) = mountSmallFilesBootstrap.map { bootstrap => bootstrap.mountSmallFilesSecret( - withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer) - }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)) + executorPod, containerWithExecutorLimitCores) + }.getOrElse((executorPod, containerWithExecutorLimitCores)) val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = executorInitContainerBootstrap.map { bootstrap => val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index f63d0aeabad3..56102afaa2ad 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -26,7 +26,10 @@ import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.submit.MountSmallFilesBootstrapImpl import org.apache.spark.internal.Logging +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} +import org.apache.spark.util.Utils private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { @@ -96,16 +99,30 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sparkConf, Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + + val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) { + val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl( + SparkTransportConf.fromSparkConf(sparkConf, "shuffle"), + sc.env.securityManager, + sc.env.securityManager.isAuthenticationEnabled()) + Some(new KubernetesExternalShuffleManagerImpl( + sparkConf, + kubernetesClient, + kubernetesExternalShuffleClient)) + } else None + val executorPodFactory = new ExecutorPodFactoryImpl( sparkConf, NodeAffinityExecutorPodModifierImpl, mountSmallFilesBootstrap, executorInitContainerbootStrap, - executorInitContainerSecretVolumePlugin) + executorInitContainerSecretVolumePlugin, + kubernetesShuffleManager) new KubernetesClusterSchedulerBackend( sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc, executorPodFactory, + kubernetesShuffleManager, kubernetesClient) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 4a4ec4d284ae..79837c0e2e80 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -37,7 +37,7 @@ import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient +import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} @@ -48,6 +48,7 @@ private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, val sc: SparkContext, executorPodFactory: ExecutorPodFactory, + shuffleManager: Option[KubernetesExternalShuffleManager], kubernetesClient: KubernetesClient) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { @@ -67,7 +68,6 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorsToRemove = Collections.newSetFromMap[String]( new ConcurrentHashMap[String, java.lang.Boolean]()).asScala - private var shufflePodCache: Option[ShufflePodCache] = None private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) private val kubernetesDriverPodName = conf @@ -88,37 +88,6 @@ private[spark] class KubernetesClusterSchedulerBackend( throw new SparkException(s"Executor cannot find driver pod", throwable) } - private val shuffleServiceConfig: Option[ShuffleServiceConfig] = - if (Utils.isDynamicAllocationEnabled(sc.conf)) { - val shuffleNamespace = conf.get(KUBERNETES_SHUFFLE_NAMESPACE) - val parsedShuffleLabels = ConfigurationUtils.parseKeyValuePairs( - conf.get(KUBERNETES_SHUFFLE_LABELS), KUBERNETES_SHUFFLE_LABELS.key, - "shuffle-labels") - if (parsedShuffleLabels.isEmpty) { - throw new SparkException(s"Dynamic allocation enabled " + - s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") - } - - val shuffleDirs = conf.get(KUBERNETES_SHUFFLE_DIR).map { - _.split(",") - }.getOrElse(Utils.getConfiguredLocalDirs(conf)) - Some( - ShuffleServiceConfig(shuffleNamespace, - parsedShuffleLabels, - shuffleDirs)) - } else { - None - } - - // A client for talking to the external shuffle service - private val kubernetesExternalShuffleClient: Option[KubernetesExternalShuffleClient] = { - if (Utils.isDynamicAllocationEnabled(sc.conf)) { - Some(getShuffleClient()) - } else { - None - } - } - override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { 0.8 @@ -221,13 +190,6 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - private def getShuffleClient(): KubernetesExternalShuffleClient = { - new KubernetesExternalShuffleClient( - SparkTransportConf.fromSparkConf(conf, "shuffle"), - sc.env.securityManager, - sc.env.securityManager.isAuthenticationEnabled()) - } - private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) @@ -260,23 +222,17 @@ private[spark] class KubernetesClusterSchedulerBackend( allocator.scheduleWithFixedDelay( allocatorRunnable, 0, podAllocationInterval, TimeUnit.SECONDS) + shuffleManager.foreach(_.start(applicationId())) if (!Utils.isDynamicAllocationEnabled(sc.conf)) { doRequestTotalExecutors(initialExecutors) - } else { - shufflePodCache = shuffleServiceConfig - .map { config => new ShufflePodCache( - kubernetesClient, config.shuffleNamespace, config.shuffleLabels) } - shufflePodCache.foreach(_.start()) - kubernetesExternalShuffleClient.foreach(_.init(applicationId())) } } override def stop(): Unit = { // stop allocation of new resources and caches. allocator.shutdown() - shufflePodCache.foreach(_.stop()) - kubernetesExternalShuffleClient.foreach(_.close()) + shuffleManager.foreach(_.stop()) // send stop message to executors so they shut down cleanly super.stop() @@ -349,7 +305,6 @@ private[spark] class KubernetesClusterSchedulerBackend( applicationId(), driverUrl, sc.conf.getExecutorEnv, - shuffleServiceConfig, driverPod, nodeToLocalTaskCount) try { @@ -500,35 +455,26 @@ private[spark] class KubernetesClusterSchedulerBackend( override def isDefinedAt(msg: Any): Boolean = { msg match { case RetrieveSparkAppConfig(executorId) => - Utils.isDynamicAllocationEnabled(sc.conf) + shuffleManager.isDefined case _ => false } } override def apply(msg: Any): Unit = { msg match { - case RetrieveSparkAppConfig(executorId) => - RUNNING_EXECUTOR_PODS_LOCK.synchronized { - var resolvedProperties = sparkProperties - val runningExecutorPod = kubernetesClient + case RetrieveSparkAppConfig(executorId) if shuffleManager.isDefined => + val runningExecutorPod = RUNNING_EXECUTOR_PODS_LOCK.synchronized { + kubernetesClient .pods() .withName(runningExecutorsToPods(executorId).getMetadata.getName) .get() - val nodeName = runningExecutorPod.getSpec.getNodeName - val shufflePodIp = shufflePodCache.get.getShufflePodForExecutor(nodeName) - - // Inform the shuffle pod about this application so it can watch. - kubernetesExternalShuffleClient.foreach( - _.registerDriverWithShuffleService(shufflePodIp, externalShufflePort)) - - resolvedProperties = resolvedProperties ++ Seq( - (SPARK_SHUFFLE_SERVICE_HOST.key, shufflePodIp)) - - val reply = SparkAppConfig( - resolvedProperties, - SparkEnv.get.securityManager.getIOEncryptionKey()) - context.reply(reply) } + val shuffleSpecifiProperties = shuffleManager.get + .getShuffleServiceConfigurationForExecutor(runningExecutorPod) + val reply = SparkAppConfig( + sparkProperties ++ shuffleSpecifiProperties, + SparkEnv.get.securityManager.getIOEncryptionKey()) + context.reply(reply) } } }.orElse(super.receiveAndReply(context)) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala new file mode 100644 index 000000000000..268f680e0e70 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.kubernetes + +import io.fabric8.kubernetes.api.model.{Pod, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.readiness.Readiness +import org.apache.commons.io.FilenameUtils +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.kubernetes.ConfigurationUtils +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient +import org.apache.spark.util.Utils + +private[spark] trait KubernetesExternalShuffleManager { + + def start(appId: String): Unit + + def stop(): Unit + + /** + * Returns the properties that should be applied for this executor pod, given that + * this executor will need to communicate with an external shuffle service. + * + * In practice, this seq will always have a size of 1, but since this method's semantics are that + * the returned values are key-value pairs to apply as properties, it is clearer to express + * this as a collection. + */ + def getShuffleServiceConfigurationForExecutor(executorPod: Pod): Seq[(String, String)] + + def getExecutorShuffleDirVolumesWithMounts: Seq[(Volume, VolumeMount)] + +} + +private[spark] class KubernetesExternalShuffleManagerImpl( + sparkConf: SparkConf, + client: KubernetesClient, + shuffleClient: KubernetesExternalShuffleClient) + extends KubernetesExternalShuffleManager with Logging { + + private val shuffleNamespace = sparkConf.get(KUBERNETES_SHUFFLE_NAMESPACE) + private val shufflePodLabels = ConfigurationUtils.parseKeyValuePairs( + sparkConf.get(KUBERNETES_SHUFFLE_LABELS), + KUBERNETES_SHUFFLE_LABELS.key, + "shuffle-labels") + if (shufflePodLabels.isEmpty) { + throw new SparkException(s"Dynamic allocation enabled " + + s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") + } + private val externalShufflePort = sparkConf.getInt("spark.shuffle.service.port", 7337) + private val shuffleDirs = sparkConf.get(KUBERNETES_SHUFFLE_DIR).map { + _.split(",") + }.getOrElse(Utils.getConfiguredLocalDirs(sparkConf)) + private var shufflePodCache = scala.collection.mutable.Map[String, String]() + private var watcher: Watch = _ + + override def start(appId: String): Unit = { + // seed the initial cache. + val pods = client.pods().inNamespace(shuffleNamespace).withLabels(shufflePodLabels.asJava).list() + pods.getItems.asScala.foreach { + pod => + if (Readiness.isReady(pod)) { + addShufflePodToCache(pod) + } else { + logWarning(s"Found unready shuffle pod ${pod.getMetadata.getName} " + + s"on node ${pod.getSpec.getNodeName}") + } + } + + watcher = client + .pods() + .inNamespace(shuffleNamespace) + .withLabels(shufflePodLabels.asJava) + .watch(new Watcher[Pod] { + override def eventReceived(action: Watcher.Action, p: Pod): Unit = { + action match { + case Action.DELETED | Action.ERROR => + shufflePodCache.remove(p.getSpec.getNodeName) + case Action.ADDED | Action.MODIFIED if Readiness.isReady(p) => + addShufflePodToCache(p) + } + } + override def onClose(e: KubernetesClientException): Unit = {} + }) + shuffleClient.init(appId) + } + + private def addShufflePodToCache(pod: Pod): Unit = shufflePodCache.synchronized { + if (shufflePodCache.contains(pod.getSpec.getNodeName)) { + val registeredPodName = shufflePodCache.get(pod.getSpec.getNodeName).get + logError(s"Ambiguous specification of shuffle service pod. " + + s"Found multiple matching pods: ${pod.getMetadata.getName}, " + + s"${registeredPodName} on ${pod.getSpec.getNodeName}") + + throw new SparkException(s"Ambiguous specification of shuffle service pod. " + + s"Found multiple matching pods: ${pod.getMetadata.getName}, " + + s"${registeredPodName} on ${pod.getSpec.getNodeName}") + } else { + shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP + } + } + + override def stop(): Unit = { + watcher.close() + shuffleClient.close() + } + + override def getShuffleServiceConfigurationForExecutor(executorPod: Pod) + : Seq[(String, String)] = { + val nodeName = executorPod.getSpec.getNodeName + val shufflePodIp = shufflePodCache.synchronized { + shufflePodCache.get(nodeName).getOrElse( + throw new SparkException(s"Unable to find shuffle pod on node $nodeName")) + } + // Inform the shuffle pod about this application so it can watch. + shuffleClient.registerDriverWithShuffleService(shufflePodIp, externalShufflePort) + Seq((SPARK_SHUFFLE_SERVICE_HOST.key, shufflePodIp)) + } + + override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = { + shuffleDirs.zipWithIndex.map { + case (shuffleDir, shuffleDirIndex) => + val volumeName = s"${FilenameUtils.getBaseName(shuffleDir)}-$shuffleDirIndex}" + val volume = new VolumeBuilder() + .withName(volumeName) + .withNewHostPath(shuffleDir) + .build() + val volumeMount = new VolumeMountBuilder() + .withName(volumeName) + .withMountPath(shuffleDir) + .build() + (volume, volumeMount) + } + } +} + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala deleted file mode 100644 index 15e02664589e..000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.kubernetes - -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.Pod -import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} -import io.fabric8.kubernetes.client.Watcher.Action -import io.fabric8.kubernetes.client.internal.readiness.Readiness - -import org.apache.spark.SparkException -import org.apache.spark.internal.Logging - -private[spark] class ShufflePodCache ( - client: KubernetesClient, - dsNamespace: String, - dsLabels: Map[String, String]) extends Logging { - - private var shufflePodCache = scala.collection.mutable.Map[String, String]() - private var watcher: Watch = _ - - def start(): Unit = { - // seed the initial cache. - val pods = client.pods() - .inNamespace(dsNamespace).withLabels(dsLabels.asJava).list() - pods.getItems.asScala.foreach { - pod => - if (Readiness.isReady(pod)) { - addShufflePodToCache(pod) - } else { - logWarning(s"Found unready shuffle pod ${pod.getMetadata.getName} " + - s"on node ${pod.getSpec.getNodeName}") - } - } - - watcher = client - .pods() - .inNamespace(dsNamespace) - .withLabels(dsLabels.asJava) - .watch(new Watcher[Pod] { - override def eventReceived(action: Watcher.Action, p: Pod): Unit = { - action match { - case Action.DELETED | Action.ERROR => - shufflePodCache.remove(p.getSpec.getNodeName) - case Action.ADDED | Action.MODIFIED if Readiness.isReady(p) => - addShufflePodToCache(p) - } - } - override def onClose(e: KubernetesClientException): Unit = {} - }) - } - - private def addShufflePodToCache(pod: Pod): Unit = { - if (shufflePodCache.contains(pod.getSpec.getNodeName)) { - val registeredPodName = shufflePodCache.get(pod.getSpec.getNodeName).get - logError(s"Ambiguous specification of shuffle service pod. " + - s"Found multiple matching pods: ${pod.getMetadata.getName}, " + - s"${registeredPodName} on ${pod.getSpec.getNodeName}") - - throw new SparkException(s"Ambiguous specification of shuffle service pod. " + - s"Found multiple matching pods: ${pod.getMetadata.getName}, " + - s"${registeredPodName} on ${pod.getSpec.getNodeName}") - } else { - shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP - } - } - - def stop(): Unit = { - watcher.close() - } - - def getShufflePodForExecutor(executorNode: String): String = { - shufflePodCache.get(executorNode) - .getOrElse(throw new SparkException(s"Unable to find shuffle pod on node $executorNode")) - } -} - diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesExternalShuffleServiceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesExternalShuffleServiceSuite.scala index 425ba58a65d1..6dedf7595162 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesExternalShuffleServiceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesExternalShuffleServiceSuite.scala @@ -20,7 +20,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.KubernetesExternalShuffleService import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient +import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl private[spark] class KubernetesExternalShuffleServiceSuite extends SparkFunSuite { @@ -32,7 +32,7 @@ private[spark] class KubernetesExternalShuffleServiceSuite extends SparkFunSuite SPARK_CONF, new SecurityManager(SPARK_CONF)) - val shuffleClient = new KubernetesExternalShuffleClient( + val shuffleClient = new KubernetesExternalShuffleClientImpl( SparkTransportConf.fromSparkConf(SPARK_CONF, "shuffle"), new SecurityManager(SPARK_CONF), false) From dd7bb02f04d245abf2a9034e2641be1c982d8ffd Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 22 Aug 2017 17:33:35 -0700 Subject: [PATCH 2/7] Fix scalastyle --- .../shuffle/kubernetes/KubernetesExternalShuffleClient.java | 1 + .../kubernetes/KubernetesExternalShuffleManager.scala | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java index 38ba59fdce77..0889dd68fa33 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.network.shuffle.kubernetes; import java.io.Closeable; diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala index 268f680e0e70..1a80bf5709e5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala @@ -75,7 +75,10 @@ private[spark] class KubernetesExternalShuffleManagerImpl( override def start(appId: String): Unit = { // seed the initial cache. - val pods = client.pods().inNamespace(shuffleNamespace).withLabels(shufflePodLabels.asJava).list() + val pods = client.pods() + .inNamespace(shuffleNamespace) + .withLabels(shufflePodLabels.asJava) + .list() pods.getItems.asScala.foreach { pod => if (Readiness.isReady(pod)) { From 7a2c3bdbb628695edfdbf1156e07043018b2d5bc Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 22 Aug 2017 17:38:52 -0700 Subject: [PATCH 3/7] Add override annotation --- .../shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java index f7f6d491b484..65e6adffcb06 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java @@ -53,6 +53,7 @@ public KubernetesExternalShuffleClientImpl( super(conf, secretKeyHolder, saslEnabled); } + @Override public void registerDriverWithShuffleService(String host, int port) throws IOException, InterruptedException { checkInit(); From b22f241dd2f169ca092468fa5510c490380c2207 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 22 Aug 2017 19:50:33 -0700 Subject: [PATCH 4/7] Fix Java style --- .../shuffle/kubernetes/KubernetesExternalShuffleClient.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java index 0889dd68fa33..e9e94c1855a5 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java @@ -22,7 +22,8 @@ public interface KubernetesExternalShuffleClient extends Closeable { - void init(String appId); + void init(String appId); - void registerDriverWithShuffleService(String host, int port) throws IOException, InterruptedException; + void registerDriverWithShuffleService(String host, int port) + throws IOException, InterruptedException; } From b95f84b4878732871bf9a9d9e09e55c2126c722a Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 23 Aug 2017 14:17:09 -0700 Subject: [PATCH 5/7] Remove unused imports. --- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 79837c0e2e80..d7a35f044e91 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -22,22 +22,16 @@ import java.util.Collections import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action -import org.apache.commons.io.FilenameUtils import scala.collection.{concurrent, mutable} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} -import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} From 7b5b13a17adf3f28155565670be8dd6ee5b34957 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 29 Aug 2017 11:13:33 -0700 Subject: [PATCH 6/7] Move volume index to the beginning to satisfy index --- .../cluster/kubernetes/KubernetesExternalShuffleManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala index 1a80bf5709e5..d00783b84a94 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesExternalShuffleManager.scala @@ -142,7 +142,7 @@ private[spark] class KubernetesExternalShuffleManagerImpl( override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = { shuffleDirs.zipWithIndex.map { case (shuffleDir, shuffleDirIndex) => - val volumeName = s"${FilenameUtils.getBaseName(shuffleDir)}-$shuffleDirIndex}" + val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}" val volume = new VolumeBuilder() .withName(volumeName) .withNewHostPath(shuffleDir) From e7a460e31dcc3ce03682f132ffebdfe68089b3a4 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 29 Aug 2017 17:42:27 -0700 Subject: [PATCH 7/7] Address PR comments. --- .../kubernetes/KubernetesExternalShuffleClientImpl.java | 7 +------ .../kubernetes/KubernetesClusterSchedulerBackend.scala | 4 ++-- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java index 65e6adffcb06..4302eff0c31f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClientImpl.java @@ -43,7 +43,7 @@ public class KubernetesExternalShuffleClientImpl .getLogger(KubernetesExternalShuffleClientImpl.class); /** - * Creates an Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}. + * Creates a Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}. * Please refer to docs on {@link ExternalShuffleClient} for more information. */ public KubernetesExternalShuffleClientImpl( @@ -74,9 +74,4 @@ public void onFailure(Throwable e) { "Please manually remove shuffle data after driver exit. Error: " + e); } } - - @Override - public void close() { - super.close(); - } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index d7a35f044e91..1593a9a842e9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -463,10 +463,10 @@ private[spark] class KubernetesClusterSchedulerBackend( .withName(runningExecutorsToPods(executorId).getMetadata.getName) .get() } - val shuffleSpecifiProperties = shuffleManager.get + val shuffleSpecificProperties = shuffleManager.get .getShuffleServiceConfigurationForExecutor(runningExecutorPod) val reply = SparkAppConfig( - sparkProperties ++ shuffleSpecifiProperties, + sparkProperties ++ shuffleSpecificProperties, SparkEnv.get.securityManager.getIOEncryptionKey()) context.reply(reply) }