Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,12 @@ object SparkSubmit {
"applications on standalone clusters.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (MESOS, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("MESOS Cluster deploy mode is not applicable to Spark shells.")
case (YARN, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("YARN Cluster deploy mode is not applicable to Spark shells.")
case (STANDALONE, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Standalone Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@
package org.apache.spark.deploy.kubernetes

import java.io.File
import java.net.URI
import java.security.SecureRandom
import java.util.ServiceLoader
import java.util.concurrent.{CountDownLatch, TimeUnit}

import com.google.common.io.Files
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher, ConfigBuilder => K8SConfigBuilder}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.codec.binary.Base64
import scala.collection.JavaConverters._

import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy._
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, KubernetesCredentials, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.rest._
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ShutdownHookManager, Utils}
Expand Down Expand Up @@ -103,7 +105,8 @@ private[spark] class Client(
throw new SparkException(s"File $file does not exist or is a directory.")
}
}
if (KubernetesFileUtils.isUriLocalFile(mainAppResource) &&
if (!SparkSubmit.isShell(mainAppResource) &&
KubernetesFileUtils.isUriLocalFile(mainAppResource) &&
!new File(Utils.resolveURI(mainAppResource).getPath).isFile) {
throw new SparkException(s"Main app resource file $mainAppResource is not a file or" +
s" is a directory.")
Expand Down Expand Up @@ -638,7 +641,13 @@ private[spark] class Client(
submitterLocalFiles: Iterable[String],
submitterLocalJars: Iterable[String],
driverPodKubernetesCredentials: KubernetesCredentials): KubernetesCreateSubmissionRequest = {
val mainResourceUri = Utils.resolveURI(mainAppResource)

val mainResourceUri = if (SparkSubmit.isShell(mainAppResource)) {
new URI("nop://file")
} else {
Utils.resolveURI(mainAppResource)
}

val resolvedAppResource: AppResource = Option(mainResourceUri.getScheme)
.getOrElse("file") match {
case "file" =>
Expand All @@ -647,6 +656,7 @@ private[spark] class Client(
val fileBase64 = Base64.encodeBase64String(fileBytes)
UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName)
case "local" => ContainerAppResource(mainAppResource)
case "nop" => NopAppResource()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this nop resource I think is the same concept as Spark already has in SparkLauncher#NO_RESOURCE

Seems like this bit of your PR could be slightly modified to address #213 ?

case other => RemoteAppResource(other)
}
val uploadFilesBase64Contents = CompressionUtils.createTarGzip(submitterLocalFiles.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.deploy.rest

import com.fasterxml.jackson.annotation.JsonSubTypes.Type
import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}

import org.apache.spark.SPARK_VERSION

case class KubernetesCredentials(
Expand Down Expand Up @@ -53,7 +53,8 @@ case class TarGzippedData(
@JsonSubTypes(value = Array(
new JsonSubTypes.Type(value = classOf[UploadedAppResource], name = "UploadedAppResource"),
new JsonSubTypes.Type(value = classOf[ContainerAppResource], name = "ContainerLocalAppResource"),
new JsonSubTypes.Type(value = classOf[RemoteAppResource], name = "RemoteAppResource")))
new JsonSubTypes.Type(value = classOf[RemoteAppResource], name = "RemoteAppResource"),
new JsonSubTypes.Type(value = classOf[NopAppResource], name = "NopAppResource")))
abstract class AppResource

case class UploadedAppResource(
Expand All @@ -64,6 +65,8 @@ case class ContainerAppResource(resourcePath: String) extends AppResource

case class RemoteAppResource(resource: String) extends AppResource

case class NopAppResource() extends AppResource

class PingResponse extends SubmitRestProtocolResponse {
val text = "pong"
message = "pong"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ private[spark] class KubernetesSparkRestServer(
s" does not exist at $downloadedFilePath")
}
ResolvedAppResource(downloadedFilePath, resource)
case NopAppResource() =>
ResolvedAppResource("", "")
}
}
}
Expand Down