Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit c57ccdc

Browse files
mccheahfoxish
authored andcommitted
Extract constants and config into separate file. Launch => Submit. (#65)
* Extract constants and config into separate file. Launch => Submit. * Address comments * A small shorthand * Refactor more ThreadUtils * Fix scalastyle, use cached thread pool * Tiny Scala style change
1 parent 4ff44d3 commit c57ccdc

File tree

9 files changed

+470
-250
lines changed

9 files changed

+470
-250
lines changed

docs/running-on-kubernetes.md

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,12 @@ Spark supports using SSL to encrypt the traffic in this bootstrapping process. I
140140
whenever possible.
141141

142142
See the [security page](security.html) and [configuration](configuration.html) sections for more information on
143-
configuring SSL; use the prefix `spark.ssl.kubernetes.driverlaunch` in configuring the SSL-related fields in the context
143+
configuring SSL; use the prefix `spark.ssl.kubernetes.submit` in configuring the SSL-related fields in the context
144144
of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver
145-
pod in starting the application, set `spark.ssl.kubernetes.driverlaunch.trustStore`.
145+
pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`.
146146

147147
One note about the keyStore is that it can be specified as either a file on the client machine or a file in the
148-
container image's disk. Thus `spark.ssl.kubernetes.driverlaunch.keyStore` can be a URI with a scheme of either `file:`
148+
container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:`
149149
or `container:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto
150150
the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme
151151
`container:`, the file is assumed to already be on the container's disk at the appropriate path.
@@ -235,7 +235,15 @@ from the other deployment modes. See the [configuration page](configuration.html
235235
<td>(none)</td>
236236
<td>
237237
Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs,
238-
where each label is in the format <code>key=value</code>.
238+
where each label is in the format <code>key=value</code>. Note that Spark also adds its own labels to the driver pod
239+
for bookkeeping purposes.
240+
</td>
241+
</tr>
242+
<tr>
243+
<td><code>spark.kubernetes.driverSubmitTimeout</code></td>
244+
<td>60s</td>
245+
<td>
246+
Time to wait for the driver pod to start running before aborting its execution.
239247
</td>
240248
</tr>
241249
</table>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 119 additions & 132 deletions
Large diffs are not rendered by default.
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import java.util.concurrent.TimeUnit
20+
21+
import org.apache.spark.{SPARK_VERSION => sparkVersion}
22+
import org.apache.spark.internal.config.ConfigBuilder
23+
24+
package object config {
25+
26+
private[spark] val KUBERNETES_NAMESPACE =
27+
ConfigBuilder("spark.kubernetes.namespace")
28+
.doc("""
29+
| The namespace that will be used for running the driver and
30+
| executor pods. When using spark-submit in cluster mode,
31+
| this can also be passed to spark-submit via the
32+
| --kubernetes-namespace command line argument.
33+
""".stripMargin)
34+
.stringConf
35+
.createWithDefault("default")
36+
37+
private[spark] val DRIVER_DOCKER_IMAGE =
38+
ConfigBuilder("spark.kubernetes.driver.docker.image")
39+
.doc("""
40+
| Docker image to use for the driver. Specify this using the
41+
| standard Docker tag format.
42+
""".stripMargin)
43+
.stringConf
44+
.createWithDefault(s"spark-driver:$sparkVersion")
45+
46+
private[spark] val EXECUTOR_DOCKER_IMAGE =
47+
ConfigBuilder("spark.kubernetes.executor.docker.image")
48+
.doc("""
49+
| Docker image to use for the executors. Specify this using
50+
| the standard Docker tag format.
51+
""".stripMargin)
52+
.stringConf
53+
.createWithDefault(s"spark-executor:$sparkVersion")
54+
55+
private[spark] val KUBERNETES_CA_CERT_FILE =
56+
ConfigBuilder("spark.kubernetes.submit.caCertFile")
57+
.doc("""
58+
| CA cert file for connecting to Kubernetes over SSL. This
59+
| file should be located on the submitting machine's disk.
60+
""".stripMargin)
61+
.stringConf
62+
.createOptional
63+
64+
private[spark] val KUBERNETES_CLIENT_KEY_FILE =
65+
ConfigBuilder("spark.kubernetes.submit.clientKeyFile")
66+
.doc("""
67+
| Client key file for authenticating against the Kubernetes
68+
| API server. This file should be located on the submitting
69+
| machine's disk.
70+
""".stripMargin)
71+
.stringConf
72+
.createOptional
73+
74+
private[spark] val KUBERNETES_CLIENT_CERT_FILE =
75+
ConfigBuilder("spark.kubernetes.submit.clientCertFile")
76+
.doc("""
77+
| Client cert file for authenticating against the
78+
| Kubernetes API server. This file should be located on
79+
| the submitting machine's disk.
80+
""".stripMargin)
81+
.stringConf
82+
.createOptional
83+
84+
private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME =
85+
ConfigBuilder("spark.kubernetes.submit.serviceAccountName")
86+
.doc("""
87+
| Service account that is used when running the driver pod.
88+
| The driver pod uses this service account when requesting
89+
| executor pods from the API server.
90+
""".stripMargin)
91+
.stringConf
92+
.createWithDefault("default")
93+
94+
private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS =
95+
ConfigBuilder("spark.kubernetes.driver.uploads.jars")
96+
.doc("""
97+
| Comma-separated list of jars to sent to the driver and
98+
| all executors when submitting the application in cluster
99+
| mode.
100+
""".stripMargin)
101+
.stringConf
102+
.createOptional
103+
104+
// Note that while we set a default for this when we start up the
105+
// scheduler, the specific default value is dynamically determined
106+
// based on the executor memory.
107+
private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
108+
ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
109+
.doc("""
110+
| The amount of off-heap memory (in megabytes) to be
111+
| allocated per executor. This is memory that accounts for
112+
| things like VM overheads, interned strings, other native
113+
| overheads, etc. This tends to grow with the executor size
114+
| (typically 6-10%).
115+
""".stripMargin)
116+
.stringConf
117+
.createOptional
118+
119+
private[spark] val KUBERNETES_DRIVER_LABELS =
120+
ConfigBuilder("spark.kubernetes.driver.labels")
121+
.doc("""
122+
| Custom labels that will be added to the driver pod.
123+
| This should be a comma-separated list of label key-value
124+
| pairs, where each label is in the format key=value. Note
125+
| that Spark also adds its own labels to the driver pod
126+
| for bookkeeping purposes.
127+
""".stripMargin)
128+
.stringConf
129+
.createOptional
130+
131+
private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT =
132+
ConfigBuilder("spark.kubernetes.driverSubmitTimeout")
133+
.doc("""
134+
| Time to wait for the driver process to start running
135+
| before aborting its execution.
136+
""".stripMargin)
137+
.timeConf(TimeUnit.SECONDS)
138+
.createWithDefault(60L)
139+
140+
private[spark] val KUBERNETES_DRIVER_SUBMIT_KEYSTORE =
141+
ConfigBuilder("spark.ssl.kubernetes.submit.keyStore")
142+
.doc("""
143+
| KeyStore file for the driver submission server listening
144+
| on SSL. Can be pre-mounted on the driver container
145+
| or uploaded from the submitting client.
146+
""".stripMargin)
147+
.stringConf
148+
.createOptional
149+
150+
private[spark] val KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE =
151+
ConfigBuilder("spark.ssl.kubernetes.submit.trustStore")
152+
.doc("""
153+
| TrustStore containing certificates for communicating
154+
| to the driver submission server over SSL.
155+
""".stripMargin)
156+
.stringConf
157+
.createOptional
158+
159+
private[spark] val KUBERNETES_DRIVER_SERVICE_NAME =
160+
ConfigBuilder("spark.kubernetes.driver.service.name")
161+
.doc("""
162+
| Kubernetes service that exposes the driver pod
163+
| for external access.
164+
""".stripMargin)
165+
.internal()
166+
.stringConf
167+
.createOptional
168+
169+
private[spark] val KUBERNETES_DRIVER_POD_NAME =
170+
ConfigBuilder("spark.kubernetes.driver.pod.name")
171+
.doc("""
172+
| Name of the driver pod.
173+
""".stripMargin)
174+
.internal()
175+
.stringConf
176+
.createOptional
177+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
package object constants {
20+
// Labels
21+
private[spark] val SPARK_DRIVER_LABEL = "spark-driver"
22+
private[spark] val SPARK_APP_ID_LABEL = "spark-app-id"
23+
private[spark] val SPARK_APP_NAME_LABEL = "spark-app-name"
24+
private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
25+
26+
// Secrets
27+
private[spark] val DRIVER_CONTAINER_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission"
28+
private[spark] val SUBMISSION_APP_SECRET_NAME = "spark-submission-server-secret"
29+
private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret"
30+
private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume"
31+
private[spark] val SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME =
32+
"spark-submission-server-key-password"
33+
private[spark] val SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME =
34+
"spark-submission-server-keystore-password"
35+
private[spark] val SUBMISSION_SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore"
36+
private[spark] val SUBMISSION_SSL_SECRETS_PREFIX = "spark-submission-server-ssl"
37+
private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"
38+
39+
// Default and fixed ports
40+
private[spark] val SUBMISSION_SERVER_PORT = 7077
41+
private[spark] val DEFAULT_DRIVER_PORT = 7078
42+
private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079
43+
private[spark] val DEFAULT_UI_PORT = 4040
44+
private[spark] val UI_PORT_NAME = "spark-ui-port"
45+
private[spark] val SUBMISSION_SERVER_PORT_NAME = "submit-server"
46+
private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager"
47+
private[spark] val DRIVER_PORT_NAME = "driver"
48+
private[spark] val EXECUTOR_PORT_NAME = "executor"
49+
50+
// Environment Variables
51+
private[spark] val ENV_SUBMISSION_SECRET_LOCATION = "SPARK_SUBMISSION_SECRET_LOCATION"
52+
private[spark] val ENV_SUBMISSION_SERVER_PORT = "SPARK_SUBMISSION_SERVER_PORT"
53+
private[spark] val ENV_SUBMISSION_KEYSTORE_FILE = "SPARK_SUBMISSION_KEYSTORE_FILE"
54+
private[spark] val ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE =
55+
"SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"
56+
private[spark] val ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE =
57+
"SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"
58+
private[spark] val ENV_SUBMISSION_KEYSTORE_TYPE = "SPARK_SUBMISSION_KEYSTORE_TYPE"
59+
private[spark] val ENV_SUBMISSION_USE_SSL = "SPARK_SUBMISSION_USE_SSL"
60+
private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
61+
private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
62+
private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
63+
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
64+
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
65+
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
66+
67+
// Miscellaneous
68+
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
69+
private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit"
70+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,22 @@ import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}
2020

2121
import org.apache.spark.SPARK_VERSION
2222

23-
// TODO: jars should probably be compressed. Shipping tarballs would be optimal.
2423
case class KubernetesCreateSubmissionRequest(
25-
val appResource: AppResource,
26-
val mainClass: String,
27-
val appArgs: Array[String],
28-
val sparkProperties: Map[String, String],
29-
val secret: String,
30-
val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
24+
appResource: AppResource,
25+
mainClass: String,
26+
appArgs: Array[String],
27+
sparkProperties: Map[String, String],
28+
secret: String,
29+
uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
3130
message = "create"
3231
clientSparkVersion = SPARK_VERSION
3332
}
3433

3534
case class TarGzippedData(
36-
val dataBase64: String,
37-
val blockSize: Int = 10240,
38-
val recordSize: Int = 512,
39-
val encoding: String
35+
dataBase64: String,
36+
blockSize: Int = 10240,
37+
recordSize: Int = 512,
38+
encoding: String
4039
)
4140

4241
@JsonTypeInfo(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,11 @@ trait KubernetesSparkRestApi {
2828
@Consumes(Array(MediaType.APPLICATION_JSON))
2929
@Produces(Array(MediaType.APPLICATION_JSON))
3030
@Path("/create")
31-
def create(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse
31+
def submitApplication(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse
3232

3333
@GET
3434
@Consumes(Array(MediaType.APPLICATION_JSON))
3535
@Produces(Array(MediaType.APPLICATION_JSON))
3636
@Path("/ping")
3737
def ping(): PingResponse
38-
3938
}

0 commit comments

Comments
 (0)