From 02274420796011eef2f9b09bd37a337e8c21552a Mon Sep 17 00:00:00 2001 From: Sahil Prasad Date: Mon, 21 Aug 2017 16:18:21 -0700 Subject: [PATCH 1/4] Fail submission if submitter-local files are provided without resource staging server URI --- .../InitContainerConfigurationStepsOrchestrator.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala index e4ea5235af18f..25812ba962ce9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl +import org.apache.spark.deploy.kubernetes.submit.{KubernetesFileUtils, SubmittedDependencyUploaderImpl} import org.apache.spark.deploy.rest.kubernetes.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl} import org.apache.spark.util.Utils @@ -62,6 +62,15 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED) .orElse(submissionSparkConf.get(RESOURCE_STAGING_SERVER_SSL_ENABLED)) .getOrElse(false) + + OptionRequirements.requireSecondIfFirstIsDefined( + KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars ++ sparkFiles).nonEmpty match { + case true => Some(true) + case false => None + }, + resourceStagingServerUri, + "Local files were provided, however no resource staging server URI was found.") + OptionRequirements.requireNandDefined( maybeResourceStagingServerInternalClientCert, maybeResourceStagingServerInternalTrustStore, From 807a29ac99312596722ba19a07177fb3a2bd9ea1 Mon Sep 17 00:00:00 2001 From: Sahil Prasad Date: Tue, 22 Aug 2017 18:10:34 -0700 Subject: [PATCH 2/4] Modified logic to validate only submitted jars; added orchestrator tests --- ...tainerConfigurationStepsOrchestrator.scala | 12 +-- ...rConfigurationStepsOrchestratorSuite.scala | 73 ++++++++++++++++++- 2 files changed, 78 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala index 25812ba962ce9..f92cea8ecbca4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala @@ -64,12 +64,12 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( .getOrElse(false) OptionRequirements.requireSecondIfFirstIsDefined( - KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars ++ sparkFiles).nonEmpty match { - case true => Some(true) - case false => None - }, - resourceStagingServerUri, - "Local files were provided, however no resource staging server URI was found.") + KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars).nonEmpty match { + case true => Some(true) + case false => None + }, + resourceStagingServerUri, + "Local files were provided, however no resource staging server URI was found.") OptionRequirements.requireNandDefined( maybeResourceStagingServerInternalClientCert, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala index 1cc8007803457..c6fa30359606f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala @@ -44,6 +44,77 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key" private val STAGING_SERVER_URI = "http://localhost:8000" + test ("error thrown if local jars provided without resource staging server") { + val sparkConf = new SparkConf(true) + .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") + .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE) + + assert(sparkConf.get(RESOURCE_STAGING_SERVER_URI).isEmpty) + + intercept[IllegalArgumentException] { + val orchestrator = new InitContainerConfigurationStepsOrchestrator( + NAMESPACE, + APP_RESOURCE_PREFIX, + SPARK_JARS, + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOCKER_IMAGE_PULL_POLICY, + DRIVER_LABELS, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + } + } + + test ("error not thrown with non-local jars and resource staging server provided") { + val sparkConf = new SparkConf(true) + .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") + .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE) + .set(RESOURCE_STAGING_SERVER_URI, STAGING_SERVER_URI) + + val orchestrator = new InitContainerConfigurationStepsOrchestrator( + NAMESPACE, + APP_RESOURCE_PREFIX, + SPARK_JARS.take(1), + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOCKER_IMAGE_PULL_POLICY, + DRIVER_LABELS, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + val initSteps : Seq[InitContainerConfigurationStep] = + orchestrator.getAllConfigurationSteps() + assert(initSteps.length == 2) + assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep]) + assert(initSteps(1).isInstanceOf[SubmittedResourcesInitContainerConfigurationStep]) + } + + test ("error not thrown with non-local jars and no resource staging server provided") { + val sparkConf = new SparkConf(true) + .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") + .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE) + + val orchestrator = new InitContainerConfigurationStepsOrchestrator( + NAMESPACE, + APP_RESOURCE_PREFIX, + SPARK_JARS.take(1), + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOCKER_IMAGE_PULL_POLICY, + DRIVER_LABELS, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + val initSteps : Seq[InitContainerConfigurationStep] = + orchestrator.getAllConfigurationSteps() + assert(initSteps.length == 1) + assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep]) + } + test ("including step to contact resource staging server") { val sparkConf = new SparkConf(true) .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") @@ -77,7 +148,7 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { val orchestrator = new InitContainerConfigurationStepsOrchestrator( NAMESPACE, APP_RESOURCE_PREFIX, - SPARK_JARS, + SPARK_JARS.take(1), SPARK_FILES, JARS_DOWNLOAD_PATH, FILES_DOWNLOAD_PATH, From 30f676b36657393e8410635a451fe0a3bd5f721c Mon Sep 17 00:00:00 2001 From: Sahil Prasad Date: Wed, 23 Aug 2017 17:05:54 -0700 Subject: [PATCH 3/4] Incorporated feedback --- .../InitContainerConfigurationStepsOrchestrator.scala | 7 ++----- .../InitContainerConfigurationStepsOrchestratorSuite.scala | 5 ++++- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala index f92cea8ecbca4..b99d4e86bf5bf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala @@ -64,12 +64,9 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( .getOrElse(false) OptionRequirements.requireSecondIfFirstIsDefined( - KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars).nonEmpty match { - case true => Some(true) - case false => None - }, + KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars).headOption, resourceStagingServerUri, - "Local files were provided, however no resource staging server URI was found.") + "Local JARs were provided, however no resource staging server URI was found.") OptionRequirements.requireNandDefined( maybeResourceStagingServerInternalClientCert, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala index c6fa30359606f..fe1b53bfef7ee 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala @@ -51,7 +51,7 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { assert(sparkConf.get(RESOURCE_STAGING_SERVER_URI).isEmpty) - intercept[IllegalArgumentException] { + val thrown = intercept[IllegalArgumentException] { val orchestrator = new InitContainerConfigurationStepsOrchestrator( NAMESPACE, APP_RESOURCE_PREFIX, @@ -65,6 +65,9 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { INIT_CONTAINER_CONFIG_MAP_KEY, sparkConf) } + + assert(thrown.getMessage === "Local JARs were provided, however no resource staging" + + " server URI was found.") } test ("error not thrown with non-local jars and resource staging server provided") { From 7c3da22cac26d7d0e332c1f9087d96f25613aca4 Mon Sep 17 00:00:00 2001 From: Sahil Prasad Date: Thu, 24 Aug 2017 01:49:37 -0700 Subject: [PATCH 4/4] Fix failing test case --- .../InitContainerConfigurationStepsOrchestratorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala index fe1b53bfef7ee..ea4d60795787c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala @@ -66,7 +66,7 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { sparkConf) } - assert(thrown.getMessage === "Local JARs were provided, however no resource staging" + + assert(thrown.getMessage contains "Local JARs were provided, however no resource staging" + " server URI was found.") }