diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala index e4ea5235af18f..b99d4e86bf5bf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl +import org.apache.spark.deploy.kubernetes.submit.{KubernetesFileUtils, SubmittedDependencyUploaderImpl} import org.apache.spark.deploy.rest.kubernetes.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl} import org.apache.spark.util.Utils @@ -62,6 +62,12 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED) .orElse(submissionSparkConf.get(RESOURCE_STAGING_SERVER_SSL_ENABLED)) .getOrElse(false) + + OptionRequirements.requireSecondIfFirstIsDefined( + KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars).headOption, + resourceStagingServerUri, + "Local JARs were provided, however no resource staging server URI was found.") + OptionRequirements.requireNandDefined( maybeResourceStagingServerInternalClientCert, maybeResourceStagingServerInternalTrustStore, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala index 1cc8007803457..ea4d60795787c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala @@ -44,6 +44,80 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key" private val STAGING_SERVER_URI = "http://localhost:8000" + test ("error thrown if local jars provided without resource staging server") { + val sparkConf = new SparkConf(true) + .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") + .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE) + + assert(sparkConf.get(RESOURCE_STAGING_SERVER_URI).isEmpty) + + val thrown = intercept[IllegalArgumentException] { + val orchestrator = new InitContainerConfigurationStepsOrchestrator( + NAMESPACE, + APP_RESOURCE_PREFIX, + SPARK_JARS, + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOCKER_IMAGE_PULL_POLICY, + DRIVER_LABELS, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + } + + assert(thrown.getMessage contains "Local JARs were provided, however no resource staging" + + " server URI was found.") + } + + test ("error not thrown with non-local jars and resource staging server provided") { + val sparkConf = new SparkConf(true) + .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") + .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE) + .set(RESOURCE_STAGING_SERVER_URI, STAGING_SERVER_URI) + + val orchestrator = new InitContainerConfigurationStepsOrchestrator( + NAMESPACE, + APP_RESOURCE_PREFIX, + SPARK_JARS.take(1), + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOCKER_IMAGE_PULL_POLICY, + DRIVER_LABELS, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + val initSteps : Seq[InitContainerConfigurationStep] = + orchestrator.getAllConfigurationSteps() + assert(initSteps.length == 2) + assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep]) + assert(initSteps(1).isInstanceOf[SubmittedResourcesInitContainerConfigurationStep]) + } + + test ("error not thrown with non-local jars and no resource staging server provided") { + val sparkConf = new SparkConf(true) + .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") + .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE) + + val orchestrator = new InitContainerConfigurationStepsOrchestrator( + NAMESPACE, + APP_RESOURCE_PREFIX, + SPARK_JARS.take(1), + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOCKER_IMAGE_PULL_POLICY, + DRIVER_LABELS, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + val initSteps : Seq[InitContainerConfigurationStep] = + orchestrator.getAllConfigurationSteps() + assert(initSteps.length == 1) + assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep]) + } + test ("including step to contact resource staging server") { val sparkConf = new SparkConf(true) .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") @@ -77,7 +151,7 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { val orchestrator = new InitContainerConfigurationStepsOrchestrator( NAMESPACE, APP_RESOURCE_PREFIX, - SPARK_JARS, + SPARK_JARS.take(1), SPARK_FILES, JARS_DOWNLOAD_PATH, FILES_DOWNLOAD_PATH,