Skip to content

Commit c8327c5

Browse files
committed
[SPARK-52193][K8S] Add Spark Connect port to Spark Driver pod and service
### What changes were proposed in this pull request? This PR aims to add `Spark Connect` port to `Spark Driver` pod and service. ### Why are the changes needed? To support `Spark Connect` easily. ### Does this PR introduce _any_ user-facing change? No behavior change because this is a new port. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50925 from dongjoon-hyun/SPARK-52193. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 02f196a commit c8327c5

File tree

5 files changed

+26
-2
lines changed

5 files changed

+26
-2
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@ object Constants {
5858
// Default and fixed ports
5959
val DEFAULT_DRIVER_PORT = 7078
6060
val DEFAULT_BLOCKMANAGER_PORT = 7079
61+
val DEFAULT_SPARK_CONNECT_SERVER_PORT = 15002
6162
val DRIVER_PORT_NAME = "driver-rpc-port"
6263
val BLOCK_MANAGER_PORT_NAME = "blockmanager"
6364
val UI_PORT_NAME = "spark-ui"
65+
val SPARK_CONNECT_SERVER_PORT_NAME = "spark-connect"
6466

6567
// Environment Variables
6668
val ENV_DRIVER_POD_IP = "SPARK_DRIVER_POD_IP"
@@ -104,6 +106,7 @@ object Constants {
104106
val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
105107
val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
106108
val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d
109+
val CONNECT_GRPC_BINDING_PORT = "spark.connect.grpc.binding.port"
107110

108111
// Hadoop Configuration
109112
val HADOOP_CONF_VOLUME = "hadoop-properties"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
9999
conf.sparkConf.getInt(BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
100100
)
101101
val driverUIPort = SparkUI.getUIPort(conf.sparkConf)
102+
val driverSparkConnectServerPort =
103+
conf.sparkConf.getInt(CONNECT_GRPC_BINDING_PORT, DEFAULT_SPARK_CONNECT_SERVER_PORT)
102104
val driverContainer = new ContainerBuilder(pod.container)
103105
.withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
104106
.withImage(driverContainerImage)
@@ -118,6 +120,11 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
118120
.withContainerPort(driverUIPort)
119121
.withProtocol("TCP")
120122
.endPort()
123+
.addNewPort()
124+
.withName(SPARK_CONNECT_SERVER_PORT_NAME)
125+
.withContainerPort(driverSparkConnectServerPort)
126+
.withProtocol("TCP")
127+
.endPort()
121128
.addNewEnv()
122129
.withName(ENV_SPARK_USER)
123130
.withValue(Utils.getCurrentUserName())

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ private[spark] class DriverServiceFeatureStep(
4848
private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt(
4949
config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
5050
private val driverUIPort = kubernetesConf.get(config.UI.UI_PORT)
51+
private val driverSparkConnectServerPort = kubernetesConf.sparkConf.getInt(
52+
CONNECT_GRPC_BINDING_PORT, DEFAULT_SPARK_CONNECT_SERVER_PORT)
5153

5254
override def configurePod(pod: SparkPod): SparkPod = pod
5355

@@ -86,6 +88,11 @@ private[spark] class DriverServiceFeatureStep(
8688
.withPort(driverUIPort)
8789
.withNewTargetPort(driverUIPort)
8890
.endPort()
91+
.addNewPort()
92+
.withName(SPARK_CONNECT_SERVER_PORT_NAME)
93+
.withPort(driverSparkConnectServerPort)
94+
.withNewTargetPort(driverSparkConnectServerPort)
95+
.endPort()
8996
.endSpec()
9097
.build()
9198
Seq(driverService)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
8484
val expectedPortNames = Set(
8585
containerPort(DRIVER_PORT_NAME, DEFAULT_DRIVER_PORT),
8686
containerPort(BLOCK_MANAGER_PORT_NAME, DEFAULT_BLOCKMANAGER_PORT),
87-
containerPort(UI_PORT_NAME, UI_PORT.defaultValue.get)
87+
containerPort(UI_PORT_NAME, UI_PORT.defaultValue.get),
88+
containerPort(SPARK_CONNECT_SERVER_PORT_NAME, DEFAULT_SPARK_CONNECT_SERVER_PORT)
8889
)
8990
val foundPortNames = configuredPod.container.getPorts.asScala.toSet
9091
assert(expectedPortNames === foundPortNames)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
4949
.set(DRIVER_PORT, 9000)
5050
.set(DRIVER_BLOCK_MANAGER_PORT, 8080)
5151
.set(UI_PORT, 4080)
52+
.set(CONNECT_GRPC_BINDING_PORT, "15003")
5253
val kconf = KubernetesTestConf.createDriverConf(
5354
sparkConf = sparkConf,
5455
labels = DRIVER_LABELS,
@@ -66,6 +67,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
6667
9000,
6768
8080,
6869
4080,
70+
15003,
6971
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
7072
kconf.appId,
7173
driverService)
@@ -100,6 +102,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
100102
DEFAULT_DRIVER_PORT,
101103
DEFAULT_BLOCKMANAGER_PORT,
102104
UI_PORT.defaultValue.get,
105+
DEFAULT_SPARK_CONNECT_SERVER_PORT,
103106
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
104107
kconf.appId,
105108
resolvedService)
@@ -233,6 +236,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
233236
driverPort: Int,
234237
blockManagerPort: Int,
235238
drierUIPort: Int,
239+
driverConnectServerPort: Int,
236240
expectedServiceName: String,
237241
appId: String,
238242
service: Service): Unit = {
@@ -249,7 +253,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
249253
DRIVER_SERVICE_ANNOTATIONS.foreach { case (k, v) =>
250254
assert(service.getMetadata.getAnnotations.get(k) === v)
251255
}
252-
assert(service.getSpec.getPorts.size() === 3)
256+
assert(service.getSpec.getPorts.size() === 4)
253257
val driverServicePorts = service.getSpec.getPorts.asScala
254258
assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
255259
assert(driverServicePorts.head.getPort.intValue() === driverPort)
@@ -260,5 +264,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
260264
assert(driverServicePorts(2).getName === UI_PORT_NAME)
261265
assert(driverServicePorts(2).getPort.intValue() === drierUIPort)
262266
assert(driverServicePorts(2).getTargetPort.getIntVal === drierUIPort)
267+
assert(driverServicePorts(3).getPort.intValue() === driverConnectServerPort)
268+
assert(driverServicePorts(3).getTargetPort.getIntVal === driverConnectServerPort)
263269
}
264270
}

0 commit comments

Comments
 (0)