Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.{File, PrintStream}
import java.io.File
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.security.PrivilegedExceptionAction
Expand Down Expand Up @@ -475,7 +475,8 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[spark] class Client(
private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl"
private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS)
private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS).filter(_.nonEmpty)
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.commons.codec.binary.Base64
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions}
import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SSLOptions}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}
Expand Down Expand Up @@ -174,8 +174,7 @@ private[spark] class KubernetesSparkRestServer(
val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath)
val driverClasspath = driverExtraClasspath ++
resolvedJars ++
sparkJars ++
Array(appResourcePath)
sparkJars
val resolvedSparkProperties = new mutable.HashMap[String, String]
resolvedSparkProperties ++= sparkProperties
resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",")
Expand Down
50 changes: 13 additions & 37 deletions resource-managers/kubernetes/docker-minimal-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,24 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-assembly_${scala.binary.version}</artifactId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>pom</type>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-examples_${scala.binary.version}</artifactId>
<artifactId>spark-kubernetes-integration-tests-spark-jobs_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -107,38 +117,4 @@
</plugin>
</plugins>
</build>

<!-- Include other profiles from the assembly. -->
<profiles>
<profile>
<id>hive</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive-thriftserver</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-ganglia-lgpl_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@
<unpack>false</unpack>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<excludes>
<exclude>org.apache.spark:spark-assembly_${scala.binary.version}:pom</exclude>
<exclude>org.spark-project.spark:unused</exclude>
<exclude>org.apache.spark:spark-examples_${scala.binary.version}</exclude>
</excludes>
</dependencySet>
<dependencySet>
<outputDirectory>examples/jars</outputDirectory>
Expand All @@ -79,7 +74,8 @@
<scope>provided</scope>
<useProjectArtifact>false</useProjectArtifact>
<includes>
<include>org.apache.spark:spark-examples_${scala.binary.version}:jar</include>
<include>org.apache.spark:spark-kubernetes-integration-tests-spark-jobs_${scala.binary.version}:jar</include>
<include>org.apache.spark:spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}:jar</include>
</includes>
</dependencySet>
</dependencySets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@
<unpack>false</unpack>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<excludes>
<exclude>org.apache.spark:spark-assembly_${scala.binary.version}:pom</exclude>
<exclude>org.spark-project.spark:unused</exclude>
<exclude>org.apache.spark:spark-examples_${scala.binary.version}</exclude>
</excludes>
</dependencySet>
<dependencySet>
<outputDirectory>examples/jars</outputDirectory>
Expand All @@ -88,7 +83,8 @@
<scope>provided</scope>
<useProjectArtifact>false</useProjectArtifact>
<includes>
<include>org.apache.spark:spark-examples_${scala.binary.version}:jar</include>
<include>org.apache.spark:spark-kubernetes-integration-tests-spark-jobs_${scala.binary.version}:jar</include>
<include>org.apache.spark:spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}:jar</include>
</includes>
</dependencySet>
</dependencySets>
Expand Down
12 changes: 12 additions & 0 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes-integration-tests-spark-jobs_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,23 @@ package org.apache.spark.deploy.kubernetes.integrationtest
import java.io.File
import java.nio.file.Paths
import java.util.UUID
import java.util.concurrent.TimeUnit

import com.google.common.collect.ImmutableList
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{Config, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import io.fabric8.kubernetes.client.{Config, KubernetesClient}
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Seconds, Span}
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkSubmit
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.Client
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1
import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils
import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus}
import org.apache.spark.util.Utils

private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {

Expand All @@ -53,14 +49,29 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.listFiles()(0)
.getAbsolutePath

private val EXAMPLES_JAR_FILE_NAME = Paths.get("target", "docker", "driver", "examples", "jars")
.toFile
.listFiles()
.toList
.map(_.getName)
.find(_.startsWith("spark-examples"))
.getOrElse(throw new IllegalStateException("Expected to find spark-examples jar; was the" +
" pre-integration-test phase run?"))
private val CONTAINER_MOUNTED_EXAMPLES_JAR_FILE_NAME =
Paths.get("target", "docker", "driver", "examples", "jars")
.toFile
.listFiles()
.toList
.map(_.getName)
.find(_.startsWith("spark-kubernetes-integration-tests-spark-jobs_"))
.getOrElse(throw new IllegalStateException("Expected to find integration test spark job" +
" jar; was the pre-integration-test phase run?"))
private val CONTAINER_MOUNTED_HELPER_JAR_FILE_NAME =
Paths.get("target", "docker", "driver", "examples", "jars")
.toFile
.listFiles()
.toList
.map(_.getName)
.find(_.startsWith("spark-kubernetes-integration-tests-spark-jobs-helpers_"))
.getOrElse(throw new IllegalStateException("Expected to find the integration test spark job" +
" helper jar; was the pre-integration-test phase run?"))

private val CONTAINER_MOUNTED_EXAMPLES_JAR_PATH = "container:///opt/spark/examples/jars/" +
CONTAINER_MOUNTED_EXAMPLES_JAR_FILE_NAME
private val CONTAINER_MOUNTED_HELPER_JAR_PATH = "/opt/spark/examples/jars/" +
CONTAINER_MOUNTED_HELPER_JAR_FILE_NAME

private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
Expand Down Expand Up @@ -114,6 +125,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.withGracePeriod(60)
.delete
})
System.setProperty("spark.jars", "")
System.setProperty(KUBERNETES_DRIVER_UPLOAD_JARS.key, "")
}

override def afterAll(): Unit = {
Expand Down Expand Up @@ -219,54 +232,17 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
"--executor-memory", "512m",
"--executor-cores", "1",
"--num-executors", "1",
"--class", "org.apache.spark.examples.SparkPi",
"--class", MAIN_CLASS,
"--jars", CONTAINER_MOUNTED_HELPER_JAR_PATH,
"--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}",
"--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}",
"--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}",
"--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest",
"--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest",
s"container:///opt/spark/examples/jars/$EXAMPLES_JAR_FILE_NAME")
val allContainersSucceeded = SettableFuture.create[Boolean]
val watcher = new Watcher[Pod] {
override def eventReceived(action: Action, pod: Pod): Unit = {
if (action == Action.ERROR) {
allContainersSucceeded.setException(
new SparkException("The execution of the driver pod failed."))
} else if (action == Action.MODIFIED &&
pod.getStatus.getContainerStatuses.asScala.nonEmpty &&
pod.getStatus
.getContainerStatuses
.asScala
.forall(_.getState.getTerminated != null)) {
allContainersSucceeded.set(
pod.getStatus
.getContainerStatuses
.asScala
.forall(_.getState.getTerminated.getExitCode == 0)
)
}
}

override def onClose(e: KubernetesClientException): Unit = {
logError("Integration test pod watch closed", e)
}
}
Utils.tryWithResource(
minikubeKubernetesClient
.pods
.withLabel("spark-app-name", "spark-pi")
.watch(watcher)) { _ =>
SparkSubmit.main(args)
assert(allContainersSucceeded.get(2, TimeUnit.MINUTES),
"Some containers exited with a non-zero status.")
}
val driverPod = minikubeKubernetesClient.pods
.withLabel("spark-app-name", "spark-pi")
.list
.getItems
.get(0)
val jobLog = minikubeKubernetesClient.pods.withName(driverPod.getMetadata.getName).getLog
assert(jobLog.contains("Pi is roughly"), "Pi was not computed by the job...")
CONTAINER_MOUNTED_EXAMPLES_JAR_PATH)
SparkSubmit.main(args)
val sparkMetricsService = getSparkMetricsService("spark-pi")
expectationsForStaticAllocation(sparkMetricsService)
}

test("Run with custom labels") {
Expand Down