Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2757,7 +2757,7 @@ private[spark] object Utils extends Logging {

/**
* Check the validity of the given Kubernetes master URL and return the resolved URL. Prefix
* "k8s:" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
* "k8s://" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
* in canCreate to determine if the KubernetesClusterManager should be used.
*/
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
Expand All @@ -2770,7 +2770,7 @@ private[spark] object Utils extends Logging {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
return s"k8s:$resolvedURL"
return s"k8s://$resolvedURL"
}

val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
Expand All @@ -2789,7 +2789,7 @@ private[spark] object Utils extends Logging {
throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme)
}

return s"k8s:$resolvedURL"
s"k8s://$resolvedURL"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class SparkSubmitSuite
childArgsMap.get("--arg") should be (Some("arg1"))
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS)
classpath should have length (0)
conf.get("spark.master") should be ("k8s:https://host:port")
conf.get("spark.master") should be ("k8s://https://host:port")
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.kubernetes.namespace") should be ("spark")
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1148,16 +1148,16 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

test("check Kubernetes master URL") {
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port")
assert(k8sMasterURLHttps === "k8s:https://host:port")
assert(k8sMasterURLHttps === "k8s://https://host:port")

val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
assert(k8sMasterURLHttp === "k8s:http://host:port")
assert(k8sMasterURLHttp === "k8s://http://host:port")

val k8sMasterURLWithoutScheme = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1:8443")
assert(k8sMasterURLWithoutScheme === "k8s:https://127.0.0.1:8443")
assert(k8sMasterURLWithoutScheme === "k8s://https://127.0.0.1:8443")

val k8sMasterURLWithoutScheme2 = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1")
assert(k8sMasterURLWithoutScheme2 === "k8s:https://127.0.0.1")
assert(k8sMasterURLWithoutScheme2 === "k8s://https://127.0.0.1")

intercept[IllegalArgumentException] {
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s:" prefix here.
val master = sparkConf.get("spark.master").substring("k8s:".length)
// We just need to get rid of the "k8s://" prefix here.
val master = sparkConf.get("spark.master").substring("k8s://".length)
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None

val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
Expand Down