Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,22 @@ from the other deployment modes. See the [configuration page](configuration.html
disk as a secret into the init-containers.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountdependencies.jarsDownloadDir</code></td>
<td><code>/var/spark-data/spark-jars</code></td>
<td>
Location to download jars to in the driver and executors. This will be mounted as an empty directory volume
into the driver and executor containers.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountdependencies.filesDownloadDir</code></td>
<td><code>/var/spark-data/spark-files</code></td>
<td>
Location to download files to in the driver and executors. This will be mounted as an empty directory volume
into the driver and executor containers.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.report.interval</code></td>
<td><code>1s</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.endVolume()
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.endContainer()
.endSpec()
resourceStagingServerSecretPlugin.map { plugin =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,15 +447,15 @@ package object config extends Logging {
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-submitted-jars")
.createWithDefault("/var/spark-data/spark-jars")

private[spark] val INIT_CONTAINER_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountdependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-submitted-files")
.createWithDefault("/var/spark-data/spark-files")

private[spark] val INIT_CONTAINER_MOUNT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ package object constants {
private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"

// Annotation keys
private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf
})
}

test("Files download path is set as environment variable") {
val bootstrappedPod = bootstrapPodWithoutSubmittedDependencies()
val containers = bootstrappedPod.getSpec.getContainers.asScala
val maybeMainContainer = containers.find(_.getName === MAIN_CONTAINER_NAME)
assert(maybeMainContainer.exists { mainContainer =>
mainContainer.getEnv.asScala.exists(envVar =>
envVar.getName == ENV_MOUNTED_FILES_DIR && envVar.getValue == FILES_DOWNLOAD_PATH)
})
}

test("Running with submitted dependencies modifies the init container with the plugin.") {
val bootstrappedPod = bootstrapPodWithSubmittedDependencies()
val podAnnotations = bootstrappedPod.getMetadata.getAnnotations.asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe can use ln instead of cp to reduce disk churn?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to symbolic link the directory itself, but we can create a link to each file in the directory?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, could do some unix-foo to make the files appear in the other directory. Something like:

if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then find "$SPARK_MOUNTED_FILES_DIR/" -type f -exec ln -s {} . \; ; fi && \

This is something we can do after this PR though -- would you rather revisit later if profiling reveals the copy is slow? It should only affect large files. If we push, I can file an issue to remind us to come back to this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - spark.files is meant to be for small things, usually.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exec ${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ WORKDIR /opt/spark
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ private[spark] object FileExistenceTest {

def main(args: Array[String]): Unit = {
if (args.length < 2) {
throw new IllegalArgumentException("Usage: WordCount <source-file> <expected contents>")
throw new IllegalArgumentException(
s"Invalid args: ${args.mkString}, " +
"Usage: FileExistenceTest <source-file> <expected contents>")
}
// Can't use SparkContext.textFile since the file is local to the driver
val file = Paths.get(args(0)).toFile
Expand All @@ -39,16 +41,15 @@ private[spark] object FileExistenceTest {
val contents = Files.toString(file, Charsets.UTF_8)
if (args(1) != contents) {
throw new SparkException(s"Contents do not match. Expected: ${args(1)}," +
s" actual, $contents")
s" actual: $contents")
} else {
println(s"File found at ${file.getAbsolutePath} with correct contents.")
}
// scalastyle:on println
}
val spark = SparkSession.builder()
.appName("Test")
.getOrCreate()
spark.stop()
while (true) {
Thread.sleep(600000)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes.integrationtest

import java.io.File
import java.nio.file.Paths
import java.util.UUID

Expand All @@ -35,11 +36,11 @@ import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minik
import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.kubernetes.submit.{Client, KeyAndCertPem}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils

private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
import KubernetesSuite._
private val testBackend = IntegrationTestBackendFactory.getTestBackend()

private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "")
private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkConf: SparkConf = _
Expand Down Expand Up @@ -124,7 +125,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
sparkConf.set("spark.kubernetes.shuffle.labels", "app=spark-shuffle-service")
sparkConf.set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace)
sparkConf.set("spark.app.name", "group-by-test")
runSparkGroupByTestAndVerifyCompletion(SUBMITTER_LOCAL_MAIN_APP_RESOURCE)
runSparkApplicationAndVerifyCompletion(
SUBMITTER_LOCAL_MAIN_APP_RESOURCE,
GROUP_BY_MAIN_CLASS,
"The Result is",
Array.empty[String])
}

test("Use remote resources without the resource staging server.") {
Expand Down Expand Up @@ -173,6 +178,20 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runSparkPiAndVerifyCompletion(SparkLauncher.NO_RESOURCE)
}

test("Added files should be placed in the driver's working directory.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir")
val testExistenceFile = new File(testExistenceFileTempDir, "input.txt")
Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8)
launchStagingServer(SSLOptions(), None)
sparkConf.set("spark.files", testExistenceFile.getAbsolutePath)
runSparkApplicationAndVerifyCompletion(
SUBMITTER_LOCAL_MAIN_APP_RESOURCE,
FILE_EXISTENCE_MAIN_CLASS,
s"File found at /opt/spark/${testExistenceFile.getName} with correct contents.",
Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS))
}

private def launchStagingServer(
resourceStagingServerSslOptions: SSLOptions, keyAndCertPem: Option[KeyAndCertPem]): Unit = {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
Expand All @@ -190,27 +209,19 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

private def runSparkPiAndVerifyCompletion(appResource: String): Unit = {
Client.run(sparkConf, appResource, SPARK_PI_MAIN_CLASS, Array.empty[String])
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", APP_LOCATOR_LABEL)
.list()
.getItems
.get(0)
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPod.getMetadata.getName)
.getLog
.contains("Pi is roughly 3"), "The application did not compute the value of pi.")
}
runSparkApplicationAndVerifyCompletion(
appResource, SPARK_PI_MAIN_CLASS, "Pi is roughly 3", Array.empty[String])
}

private def runSparkGroupByTestAndVerifyCompletion(appResource: String): Unit = {
private def runSparkApplicationAndVerifyCompletion(
appResource: String,
mainClass: String,
expectedLogOnCompletion: String,
appArgs: Array[String]): Unit = {
Client.run(
sparkConf = sparkConf,
appArgs = Array.empty[String],
mainClass = GROUP_BY_MAIN_CLASS,
appArgs = appArgs,
mainClass = mainClass,
mainAppResource = appResource)
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
Expand All @@ -223,7 +234,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.pods()
.withName(driverPod.getMetadata.getName)
.getLog
.contains("The Result is"), "The application did not complete.")
.contains(expectedLogOnCompletion), "The application did not complete.")
}
}

Expand Down Expand Up @@ -285,8 +296,6 @@ private[spark] object KubernetesSuite {
val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" +
s"integration-tests-jars/${HELPER_JAR_FILE.getName}"

val TEST_EXISTENCE_FILE = Paths.get("test-data", "input.txt").toFile
val TEST_EXISTENCE_FILE_CONTENTS = Files.toString(TEST_EXISTENCE_FILE, Charsets.UTF_8)
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
Expand All @@ -295,6 +304,7 @@ private[spark] object KubernetesSuite {
".integrationtest.jobs.FileExistenceTest"
val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
".integrationtest.jobs.GroupByTest"
val TEST_EXISTENCE_FILE_CONTENTS = "contents"

case object ShuffleNotReadyException extends Exception
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
.set("spark.executor.memory", "500m")
.set("spark.executor.cores", "1")
.set("spark.executors.instances", "1")
.set("spark.app.name", "spark-pi")
.set("spark.app.name", "spark-test-app")
.set("spark.ui.enabled", "true")
.set("spark.testing", "false")
.set(WAIT_FOR_APP_COMPLETION, false)
Expand Down