Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit c35fe48

Browse files
committed
Addressed another round of comments
1 parent 12f2797 commit c35fe48

File tree

13 files changed

+65
-41
lines changed

13 files changed

+65
-41
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
261261
case m if m.startsWith("k8s") => KUBERNETES
262262
case m if m.startsWith("local") => LOCAL
263263
case _ =>
264-
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
264+
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local")
265265
-1
266266
}
267267

@@ -296,6 +296,10 @@ object SparkSubmit extends CommandLineUtils with Logging {
296296
}
297297
}
298298

299+
if (clusterManager == KUBERNETES) {
300+
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
301+
}
302+
299303
// Fail fast, the following modes are not supported or applicable
300304
(clusterManager, deployMode) match {
301305
case (STANDALONE, CLUSTER) if args.isPython =>
@@ -304,12 +308,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
304308
case (STANDALONE, CLUSTER) if args.isR =>
305309
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
306310
"applications on standalone clusters.")
307-
case (KUBERNETES, CLIENT) =>
308-
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
309311
case (KUBERNETES, _) if args.isPython =>
310312
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
311313
case (KUBERNETES, _) if args.isR =>
312314
printErrorAndExit("R applications are currently not supported for Kubernetes.")
315+
case (KUBERNETES, CLIENT) =>
316+
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
313317
case (LOCAL, CLUSTER) =>
314318
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
315319
case (_, CLUSTER) if isShell(args.primaryResource) =>

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
301301
}
302302

303303
private def validateKillArguments(): Unit = {
304-
if (!master.startsWith("spark://")
305-
&& !master.startsWith("mesos://")
306-
&& !master.startsWith("k8s://")) {
304+
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
307305
SparkSubmit.printErrorAndExit(
308306
"Killing submissions is only supported in standalone or Mesos mode!")
309307
}
@@ -313,9 +311,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
313311
}
314312

315313
private def validateStatusRequestArguments(): Unit = {
316-
if (!master.startsWith("spark://")
317-
&& !master.startsWith("mesos://")
318-
&& !master.startsWith("k8s://")) {
314+
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
319315
SparkSubmit.printErrorAndExit(
320316
"Requesting submission statuses is only supported in standalone or Mesos mode!")
321317
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2744,6 +2744,25 @@ private[spark] object Utils extends Logging {
27442744
}
27452745
}
27462746

2747+
/**
2748+
* Check the validity of the given Kubernetes master URL and return the resolved URL.
2749+
*/
2750+
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
2751+
require(rawMasterURL.startsWith("k8s://"),
2752+
"Kubernetes master URL must start with k8s://.")
2753+
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)
2754+
if (masterWithoutK8sPrefix.startsWith("https://")) {
2755+
masterWithoutK8sPrefix
2756+
} else if (masterWithoutK8sPrefix.startsWith("http://")) {
2757+
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.")
2758+
masterWithoutK8sPrefix
2759+
} else {
2760+
val resolvedURL = s"https://$masterWithoutK8sPrefix"
2761+
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
2762+
s"URL is $resolvedURL.")
2763+
resolvedURL
2764+
}
2765+
}
27472766
}
27482767

27492768
private[util] object CallerContext extends Logging {

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,6 +1146,17 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
11461146
}
11471147
}
11481148

1149+
test("check Kubernetes master URL") {
1150+
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port")
1151+
assert(k8sMasterURLHttps == "https://host:port")
1152+
1153+
val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
1154+
assert(k8sMasterURLHttp == "http://host:port")
1155+
1156+
intercept[IllegalArgumentException] {
1157+
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port")
1158+
}
1159+
}
11491160
}
11501161

11511162
private class SimpleExtension

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -180,19 +180,4 @@ private[spark] object Config extends Logging {
180180
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
181181

182182
val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
183-
184-
def getK8sMasterUrl(rawMasterString: String): String = {
185-
require(rawMasterString.startsWith("k8s://"),
186-
"Master URL should start with k8s:// in Kubernetes mode.")
187-
val masterWithoutK8sPrefix = rawMasterString.substring("k8s://".length)
188-
if (masterWithoutK8sPrefix.startsWith("http://")
189-
|| masterWithoutK8sPrefix.startsWith("https://")) {
190-
masterWithoutK8sPrefix
191-
} else {
192-
val resolvedURL = s"https://$masterWithoutK8sPrefix"
193-
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
194-
s"URL is $resolvedURL")
195-
resolvedURL
196-
}
197-
}
198183
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import java.util.{Collections, UUID}
2020

2121
import scala.collection.JavaConverters._
2222
import scala.collection.mutable
23-
import scala.util.control.NonFatal
2423

2524
import io.fabric8.kubernetes.api.model._
2625
import io.fabric8.kubernetes.client.KubernetesClient
@@ -150,7 +149,7 @@ private[spark] class Client(
150149
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
151150
}
152151
} catch {
153-
case NonFatal(e) =>
152+
case e: Throwable =>
154153
kubernetesClient.pods().delete(createdDriverPod)
155154
throw e
156155
}
@@ -198,7 +197,8 @@ private[spark] object Client extends SparkApplication {
198197
val launchTime = System.currentTimeMillis()
199198
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
200199
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
201-
val master = getK8sMasterUrl(sparkConf.get("spark.master"))
200+
// The master URL has been checked for validity already in SparkSubmit.
201+
val master = sparkConf.get("spark.master")
202202
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter(_ => waitForAppCompletion)
203203

204204
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ private[k8s] class LoggingPodStatusWatcherImpl(
101101
}
102102

103103
private def formatPodState(pod: Pod): String = {
104-
// TODO include specific container state
105104
val details = Seq[(String, String)](
106105
// pod metadata
107106
("pod name", pod.getMetadata.getName),

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[spark] class DriverServiceBootstrapStep(
8484
val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
8585
val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
8686
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
87-
.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname)
87+
.set(DRIVER_HOST_KEY, driverHostname)
8888
.set("spark.driver.port", driverPort.toString)
8989
.set(
9090
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
2626
import org.apache.spark.deploy.k8s.Constants._
2727
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
2828

29-
private[spark] class DependencyResolutionStepSuite extends SparkFunSuite {
29+
class DependencyResolutionStepSuite extends SparkFunSuite {
3030

3131
private val SPARK_JARS = Seq(
3232
"hdfs://localhost:9000/apps/jars/jar1.jar",

resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
FROM spark-base
1919

20-
# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
21-
# command should be invoked from the top level directory of the Spark distribution. E.g.:
22-
# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile .
20+
# Before building the docker image, first build and make a Spark distribution following
21+
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
22+
# If this docker file is being used in the context of building your images from a Spark
23+
# distribution, the docker build command should be invoked from the top level directory
24+
# of the Spark distribution. E.g.:
25+
# docker build -t spark-driver:latest -f dockerfiles/spark-base/Dockerfile .
2326

2427
COPY examples /opt/spark/examples
2528

0 commit comments

Comments
 (0)