From 09d6611b4e5caed7b60b67bfbd88839248cdceb0 Mon Sep 17 00:00:00 2001 From: "chandulal.kavar" Date: Wed, 6 Mar 2019 23:36:36 +0800 Subject: [PATCH 1/5] [SPARK-27061][K8S] Expose Driver UI port on driver service to access UI using service. --- .../scala/org/apache/spark/deploy/k8s/Constants.scala | 1 + .../k8s/features/DriverServiceFeatureStep.scala | 9 +++++++++ .../k8s/features/DriverServiceFeatureStepSuite.scala | 11 +++++++++-- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index a3c74ff7b2885..47dd8ad428059 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -45,6 +45,7 @@ private[spark] object Constants { // Default and fixed ports val DEFAULT_DRIVER_PORT = 7078 val DEFAULT_BLOCKMANAGER_PORT = 7079 + val DEFAULT_UI_PORT = 4040 val DRIVER_PORT_NAME = "driver-rpc-port" val BLOCK_MANAGER_PORT_NAME = "blockmanager" val UI_PORT_NAME = "spark-ui" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index cec8769b8378e..270c0ded44fdb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.{config, Logging} +import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock} private[spark] class DriverServiceFeatureStep( @@ -55,6 +56,9 @@ private[spark] class DriverServiceFeatureStep( private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt( config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) + val driverUIPort = SparkUI.getUIPort(kubernetesConf + .sparkConf) + override def configurePod(pod: SparkPod): SparkPod = pod override def getAdditionalPodSystemProperties(): Map[String, String] = { @@ -82,6 +86,11 @@ private[spark] class DriverServiceFeatureStep( .withPort(driverBlockManagerPort) .withNewTargetPort(driverBlockManagerPort) .endPort() + .addNewPort() + .withName(UI_PORT_NAME) + .withPort(driverUIPort) + .withNewTargetPort(driverUIPort) + .endPort() .endSpec() .build() Seq(driverService) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index fbd99b73b37a4..1ffcd811b8374 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -38,10 +38,11 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { "label1key" -> "label1value", "label2key" -> "label2value") - test("Headless service has a port for the driver RPC and the block manager.") { + test("Headless service has a port for the driver RPC, the block manager and driver ui.") { val sparkConf = new SparkConf(false) .set(DRIVER_PORT, 9000) .set(DRIVER_BLOCK_MANAGER_PORT, 8080) + .set(UI_PORT_NAME, "4040") val kconf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, labels = DRIVER_LABELS) @@ -56,6 +57,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { verifyService( 9000, 8080, + 4040, s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", driverService) } @@ -85,6 +87,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { verifyService( DEFAULT_DRIVER_PORT, DEFAULT_BLOCKMANAGER_PORT, + DEFAULT_UI_PORT, s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", resolvedService) val additionalProps = configurationStep.getAdditionalPodSystemProperties() @@ -152,6 +155,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { private def verifyService( driverPort: Int, blockManagerPort: Int, + drierUIPort: Int, expectedServiceName: String, service: Service): Unit = { assert(service.getMetadata.getName === expectedServiceName) @@ -159,7 +163,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { DRIVER_LABELS.foreach { case (k, v) => assert(service.getSpec.getSelector.get(k) === v) } - assert(service.getSpec.getPorts.size() === 2) + assert(service.getSpec.getPorts.size() === 3) val driverServicePorts = service.getSpec.getPorts.asScala assert(driverServicePorts.head.getName === DRIVER_PORT_NAME) assert(driverServicePorts.head.getPort.intValue() === driverPort) @@ -167,5 +171,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME) assert(driverServicePorts(1).getPort.intValue() === blockManagerPort) assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort) + assert(driverServicePorts(2).getName === UI_PORT_NAME) + assert(driverServicePorts(2).getPort.intValue() === drierUIPort) + assert(driverServicePorts(2).getTargetPort.getIntVal === drierUIPort) } } From 0d8e4fcfffb8991d6be4a7346eccba977ebde11d Mon Sep 17 00:00:00 2001 From: "chandulal.kavar" Date: Thu, 7 Mar 2019 13:15:26 +0800 Subject: [PATCH 2/5] [SPARK-27061][K8S] Use default value of UI_PORT --- .../main/scala/org/apache/spark/deploy/k8s/Constants.scala | 4 +++- .../deploy/k8s/features/DriverServiceFeatureStep.scala | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 47dd8ad428059..f37c3a719e987 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.k8s +import org.apache.spark.internal.config.UI._ + private[spark] object Constants { // Labels @@ -45,7 +47,7 @@ private[spark] object Constants { // Default and fixed ports val DEFAULT_DRIVER_PORT = 7078 val DEFAULT_BLOCKMANAGER_PORT = 7079 - val DEFAULT_UI_PORT = 4040 + val DEFAULT_UI_PORT = UI_PORT.defaultValue.get val DRIVER_PORT_NAME = "driver-rpc-port" val BLOCK_MANAGER_PORT_NAME = "blockmanager" val UI_PORT_NAME = "spark-ui" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index 270c0ded44fdb..95a8f2ef33399 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.config.UI._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock} @@ -55,9 +56,8 @@ private[spark] class DriverServiceFeatureStep( config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT) private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt( config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) - - val driverUIPort = SparkUI.getUIPort(kubernetesConf - .sparkConf) + private val driverUIPort = kubernetesConf.sparkConf.getInt( + UI_PORT.key, DEFAULT_UI_PORT) override def configurePod(pod: SparkPod): SparkPod = pod From ffc2e2fd52b05fc589083185a5faab4bc2957794 Mon Sep 17 00:00:00 2001 From: "chandulal.kavar" Date: Fri, 8 Mar 2019 10:50:59 +0800 Subject: [PATCH 3/5] [SPARK-27061][K8S] Use UI_PORT default value --- .../main/scala/org/apache/spark/deploy/k8s/Constants.scala | 3 --- .../deploy/k8s/features/DriverServiceFeatureStep.scala | 2 +- .../k8s/features/DriverServiceFeatureStepSuite.scala | 7 ++++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index f37c3a719e987..a3c74ff7b2885 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.deploy.k8s -import org.apache.spark.internal.config.UI._ - private[spark] object Constants { // Labels @@ -47,7 +45,6 @@ private[spark] object Constants { // Default and fixed ports val DEFAULT_DRIVER_PORT = 7078 val DEFAULT_BLOCKMANAGER_PORT = 7079 - val DEFAULT_UI_PORT = UI_PORT.defaultValue.get val DRIVER_PORT_NAME = "driver-rpc-port" val BLOCK_MANAGER_PORT_NAME = "blockmanager" val UI_PORT_NAME = "spark-ui" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index 95a8f2ef33399..df43d739b23e0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -57,7 +57,7 @@ private[spark] class DriverServiceFeatureStep( private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt( config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) private val driverUIPort = kubernetesConf.sparkConf.getInt( - UI_PORT.key, DEFAULT_UI_PORT) + UI_PORT.key, UI_PORT.defaultValue.get) override def configurePod(pod: SparkPod): SparkPod = pod diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 1ffcd811b8374..9068289bab581 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.UI._ import org.apache.spark.util.ManualClock class DriverServiceFeatureStepSuite extends SparkFunSuite { @@ -42,7 +43,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { val sparkConf = new SparkConf(false) .set(DRIVER_PORT, 9000) .set(DRIVER_BLOCK_MANAGER_PORT, 8080) - .set(UI_PORT_NAME, "4040") + .set(UI_PORT, 4080) val kconf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, labels = DRIVER_LABELS) @@ -57,7 +58,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { verifyService( 9000, 8080, - 4040, + 4080, s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", driverService) } @@ -87,7 +88,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { verifyService( DEFAULT_DRIVER_PORT, DEFAULT_BLOCKMANAGER_PORT, - DEFAULT_UI_PORT, + UI_PORT.defaultValue.get, s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", resolvedService) val additionalProps = configurationStep.getAdditionalPodSystemProperties() From b7c4e660418919f965f85b27a8e202d54fb1d709 Mon Sep 17 00:00:00 2001 From: "chandulal.kavar" Date: Fri, 8 Mar 2019 14:53:42 +0800 Subject: [PATCH 4/5] [SPARK-27061][K8S] Use get method of sparkconf to get UI Port --- .../spark/deploy/k8s/features/DriverServiceFeatureStep.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index df43d739b23e0..0f3172e7bb3a6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -56,8 +56,7 @@ private[spark] class DriverServiceFeatureStep( config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT) private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt( config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) - private val driverUIPort = kubernetesConf.sparkConf.getInt( - UI_PORT.key, UI_PORT.defaultValue.get) + private val driverUIPort = kubernetesConf.get(UI_PORT) override def configurePod(pod: SparkPod): SparkPod = pod From 6945580b9a742a43964747364c3373b0c558e092 Mon Sep 17 00:00:00 2001 From: "chandulal.kavar" Date: Sat, 9 Mar 2019 09:41:37 +0800 Subject: [PATCH 5/5] [SPARK-27061][K8S] Remove unused imports --- .../spark/deploy/k8s/features/DriverServiceFeatureStep.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index 0f3172e7bb3a6..925bcdf3e637f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -23,8 +23,6 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.{config, Logging} -import org.apache.spark.internal.config.UI._ -import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock} private[spark] class DriverServiceFeatureStep( @@ -56,7 +54,7 @@ private[spark] class DriverServiceFeatureStep( config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT) private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt( config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) - private val driverUIPort = kubernetesConf.get(UI_PORT) + private val driverUIPort = kubernetesConf.get(config.UI.UI_PORT) override def configurePod(pod: SparkPod): SparkPod = pod