diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index cec8769b8378e..925bcdf3e637f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -54,6 +54,7 @@ private[spark] class DriverServiceFeatureStep( config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT) private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt( config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) + private val driverUIPort = kubernetesConf.get(config.UI.UI_PORT) override def configurePod(pod: SparkPod): SparkPod = pod @@ -82,6 +83,11 @@ private[spark] class DriverServiceFeatureStep( .withPort(driverBlockManagerPort) .withNewTargetPort(driverBlockManagerPort) .endPort() + .addNewPort() + .withName(UI_PORT_NAME) + .withPort(driverUIPort) + .withNewTargetPort(driverUIPort) + .endPort() .endSpec() .build() Seq(driverService) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index fbd99b73b37a4..9068289bab581 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.UI._ import org.apache.spark.util.ManualClock class DriverServiceFeatureStepSuite extends SparkFunSuite { @@ -38,10 +39,11 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { "label1key" -> "label1value", "label2key" -> "label2value") - test("Headless service has a port for the driver RPC and the block manager.") { + test("Headless service has a port for the driver RPC, the block manager and driver ui.") { val sparkConf = new SparkConf(false) .set(DRIVER_PORT, 9000) .set(DRIVER_BLOCK_MANAGER_PORT, 8080) + .set(UI_PORT, 4080) val kconf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, labels = DRIVER_LABELS) @@ -56,6 +58,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { verifyService( 9000, 8080, + 4080, s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", driverService) } @@ -85,6 +88,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { verifyService( DEFAULT_DRIVER_PORT, DEFAULT_BLOCKMANAGER_PORT, + UI_PORT.defaultValue.get, s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", resolvedService) val additionalProps = configurationStep.getAdditionalPodSystemProperties() @@ -152,6 +156,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { private def verifyService( driverPort: Int, blockManagerPort: Int, + drierUIPort: Int, expectedServiceName: String, service: Service): Unit = { assert(service.getMetadata.getName === expectedServiceName) @@ -159,7 +164,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { DRIVER_LABELS.foreach { case (k, v) => assert(service.getSpec.getSelector.get(k) === v) } - assert(service.getSpec.getPorts.size() === 2) + assert(service.getSpec.getPorts.size() === 3) val driverServicePorts = service.getSpec.getPorts.asScala assert(driverServicePorts.head.getName === DRIVER_PORT_NAME) assert(driverServicePorts.head.getPort.intValue() === driverPort) @@ -167,5 +172,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME) assert(driverServicePorts(1).getPort.intValue() === blockManagerPort) assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort) + assert(driverServicePorts(2).getName === UI_PORT_NAME) + assert(driverServicePorts(2).getPort.intValue() === drierUIPort) + assert(driverServicePorts(2).getTargetPort.getIntVal === drierUIPort) } }