Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl
import org.apache.spark.deploy.kubernetes.submit.{KubernetesFileUtils, SubmittedDependencyUploaderImpl}
import org.apache.spark.deploy.rest.kubernetes.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -62,6 +62,12 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED)
.orElse(submissionSparkConf.get(RESOURCE_STAGING_SERVER_SSL_ENABLED))
.getOrElse(false)

OptionRequirements.requireSecondIfFirstIsDefined(
KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars).headOption,
resourceStagingServerUri,
"Local JARs were provided, however no resource staging server URI was found.")

OptionRequirements.requireNandDefined(
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,80 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {
private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key"
private val STAGING_SERVER_URI = "http://localhost:8000"

test ("error thrown if local jars provided without resource staging server") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)

assert(sparkConf.get(RESOURCE_STAGING_SERVER_URI).isEmpty)

val thrown = intercept[IllegalArgumentException] {
val orchestrator = new InitContainerConfigurationStepsOrchestrator(
NAMESPACE,
APP_RESOURCE_PREFIX,
SPARK_JARS,
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
DOCKER_IMAGE_PULL_POLICY,
DRIVER_LABELS,
INIT_CONTAINER_CONFIG_MAP_NAME,
INIT_CONTAINER_CONFIG_MAP_KEY,
sparkConf)
}

assert(thrown.getMessage contains "Local JARs were provided, however no resource staging" +
" server URI was found.")
}

test ("error not thrown with non-local jars and resource staging server provided") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)
.set(RESOURCE_STAGING_SERVER_URI, STAGING_SERVER_URI)

val orchestrator = new InitContainerConfigurationStepsOrchestrator(
NAMESPACE,
APP_RESOURCE_PREFIX,
SPARK_JARS.take(1),
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
DOCKER_IMAGE_PULL_POLICY,
DRIVER_LABELS,
INIT_CONTAINER_CONFIG_MAP_NAME,
INIT_CONTAINER_CONFIG_MAP_KEY,
sparkConf)
val initSteps : Seq[InitContainerConfigurationStep] =
orchestrator.getAllConfigurationSteps()
assert(initSteps.length == 2)
assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep])
assert(initSteps(1).isInstanceOf[SubmittedResourcesInitContainerConfigurationStep])
}

test ("error not thrown with non-local jars and no resource staging server provided") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)

val orchestrator = new InitContainerConfigurationStepsOrchestrator(
NAMESPACE,
APP_RESOURCE_PREFIX,
SPARK_JARS.take(1),
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
DOCKER_IMAGE_PULL_POLICY,
DRIVER_LABELS,
INIT_CONTAINER_CONFIG_MAP_NAME,
INIT_CONTAINER_CONFIG_MAP_KEY,
sparkConf)
val initSteps : Seq[InitContainerConfigurationStep] =
orchestrator.getAllConfigurationSteps()
assert(initSteps.length == 1)
assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep])
}

test ("including step to contact resource staging server") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
Expand Down Expand Up @@ -77,7 +151,7 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {
val orchestrator = new InitContainerConfigurationStepsOrchestrator(
NAMESPACE,
APP_RESOURCE_PREFIX,
SPARK_JARS,
SPARK_JARS.take(1),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit brittle in case SPARK_JARS changes in the future -- we should create a new SPARK_JARS_REMOTE that has only hdfs:// path

SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
Expand Down