From 653cc9521d2461069ef14a2c39d533c83337fcc3 Mon Sep 17 00:00:00 2001 From: foxish Date: Fri, 31 Mar 2017 18:56:35 -0700 Subject: [PATCH 1/2] Modifications to make spark-shell work --- .../spark/deploy/kubernetes/Client.scala | 20 ++++++++++++++----- .../rest/KubernetesRestProtocolMessages.scala | 7 +++++-- .../KubernetesSparkRestServer.scala | 2 ++ 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index e628464aa6201..1fb5d3ca36025 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.kubernetes import java.io.File +import java.net.URI import java.security.SecureRandom import java.util.ServiceLoader import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -24,15 +25,16 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import com.google.common.io.Files import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher, ConfigBuilder => K8SConfigBuilder} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.codec.binary.Base64 -import scala.collection.JavaConverters._ +import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy._ import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, KubernetesCredentials, RemoteAppResource, UploadedAppResource} +import org.apache.spark.deploy.rest._ import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -103,7 +105,8 @@ private[spark] class Client( throw new SparkException(s"File $file does not exist or is a directory.") } } - if (KubernetesFileUtils.isUriLocalFile(mainAppResource) && + if (!SparkSubmit.isShell(mainAppResource) && + KubernetesFileUtils.isUriLocalFile(mainAppResource) && !new File(Utils.resolveURI(mainAppResource).getPath).isFile) { throw new SparkException(s"Main app resource file $mainAppResource is not a file or" + s" is a directory.") @@ -638,7 +641,13 @@ private[spark] class Client( submitterLocalFiles: Iterable[String], submitterLocalJars: Iterable[String], driverPodKubernetesCredentials: KubernetesCredentials): KubernetesCreateSubmissionRequest = { - val mainResourceUri = Utils.resolveURI(mainAppResource) + + val mainResourceUri = if (SparkSubmit.isShell(mainAppResource)) { + new URI("nop://file") + } else { + Utils.resolveURI(mainAppResource) + } + val resolvedAppResource: AppResource = Option(mainResourceUri.getScheme) .getOrElse("file") match { case "file" => @@ -647,6 +656,7 @@ private[spark] class Client( val fileBase64 = Base64.encodeBase64String(fileBytes) UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName) case "local" => ContainerAppResource(mainAppResource) + case "nop" => NopAppResource() case other => RemoteAppResource(other) } val uploadFilesBase64Contents = CompressionUtils.createTarGzip(submitterLocalFiles.map( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala index 1ea44109c5f5e..5fa1cc23f6d1b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala @@ -16,8 +16,8 @@ */ package org.apache.spark.deploy.rest +import com.fasterxml.jackson.annotation.JsonSubTypes.Type import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} - import org.apache.spark.SPARK_VERSION case class KubernetesCredentials( @@ -53,7 +53,8 @@ case class TarGzippedData( @JsonSubTypes(value = Array( new JsonSubTypes.Type(value = classOf[UploadedAppResource], name = "UploadedAppResource"), new JsonSubTypes.Type(value = classOf[ContainerAppResource], name = "ContainerLocalAppResource"), - new JsonSubTypes.Type(value = classOf[RemoteAppResource], name = "RemoteAppResource"))) + new JsonSubTypes.Type(value = classOf[RemoteAppResource], name = "RemoteAppResource"), + new JsonSubTypes.Type(value = classOf[NopAppResource], name = "NopAppResource"))) abstract class AppResource case class UploadedAppResource( @@ -64,6 +65,8 @@ case class ContainerAppResource(resourcePath: String) extends AppResource case class RemoteAppResource(resource: String) extends AppResource +case class NopAppResource() extends AppResource + class PingResponse extends SubmitRestProtocolResponse { val text = "pong" message = "pong" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 4ca01b2f6bd38..67b1262bed4c7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -378,6 +378,8 @@ private[spark] class KubernetesSparkRestServer( s" does not exist at $downloadedFilePath") } ResolvedAppResource(downloadedFilePath, resource) + case NopAppResource() => + ResolvedAppResource("", "") } } } From 5a627528a5aefd337ea0519648220ee1e149c287 Mon Sep 17 00:00:00 2001 From: foxish Date: Fri, 31 Mar 2017 19:20:42 -0700 Subject: [PATCH 2/2] Change spark-submit to allow k8s cluster mode to use spark shell --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 002b29d5564e1..3923a3f307dee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -345,8 +345,12 @@ object SparkSubmit { "applications on standalone clusters.") case (LOCAL, CLUSTER) => printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") - case (_, CLUSTER) if isShell(args.primaryResource) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") + case (MESOS, CLUSTER) if isShell(args.primaryResource) => + printErrorAndExit("MESOS Cluster deploy mode is not applicable to Spark shells.") + case (YARN, CLUSTER) if isShell(args.primaryResource) => + printErrorAndExit("YARN Cluster deploy mode is not applicable to Spark shells.") + case (STANDALONE, CLUSTER) if isShell(args.primaryResource) => + printErrorAndExit("Standalone Cluster deploy mode is not applicable to Spark shells.") case (_, CLUSTER) if isSqlShell(args.mainClass) => printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.") case (_, CLUSTER) if isThriftServer(args.mainClass) =>