Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,26 @@ class SparkProcessBuilder(
buffer += s"${convertConfigKey(k)}=$v"
}

// iff the keytab is specified, PROXY_USER is not supported
if (!useKeytab()) {
buffer += PROXY_USER
buffer += proxyUser
// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
def setSparkUserName(userName: String): Unit = {
clusterManager().foreach(cm => {
if (cm.startsWith("k8s://")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no harm to set for all cluster managers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.kubernetes.driverEnv. only affect on kubernetes case. We are only setting up this On Kubernetes for now.
As for SPARK_USER_NAME this env, it's affect in spark/kubernetes/entrypoint.sh

buffer += CONF
buffer += s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$userName"
buffer += CONF
buffer += s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$userName"
}
})
}

// if the keytab is specified, PROXY_USER is not supported
tryKeytab() match {
case None =>
setSparkUserName(proxyUser)
buffer += PROXY_USER
buffer += proxyUser
case Some(name) =>
setSparkUserName(name)
}

mainResource.foreach { r => buffer += r }
Expand All @@ -104,26 +120,27 @@ class SparkProcessBuilder(

override protected def module: String = "kyuubi-spark-sql-engine"

private def useKeytab(): Boolean = {
private def tryKeytab(): Option[String] = {
val principal = conf.getOption(PRINCIPAL)
val keytab = conf.getOption(KEYTAB)
if (principal.isEmpty || keytab.isEmpty) {
false
None
} else {
try {
val ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(principal.get, keytab.get)
val keytabEnabled = ugi.getShortUserName == proxyUser
if (!keytabEnabled) {
if (ugi.getShortUserName != proxyUser) {
warn(s"The session proxy user: $proxyUser is not same with " +
s"spark principal: ${ugi.getShortUserName}, so we can't support use keytab. " +
s"Fallback to use proxy user.")
None
} else {
Some(ugi.getShortUserName)
}
keytabEnabled
} catch {
case e: IOException =>
error(s"Failed to login for ${principal.get}", e)
false
None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,34 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
assert(!pb.toString.contains(engineRefId2))
assert(pb.toString.contains(engineRefId))
}

test("SparkProcessBuilder build spark engine with SPARK_USER_NAME") {
val proxyName = "kyuubi"
val conf1 = KyuubiConf(false).set("spark.master", "k8s://test:12345")
val b1 = new SparkProcessBuilder(proxyName, conf1)
val c1 = b1.toString.split(' ')
assert(c1.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$proxyName"))
assert(c1.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$proxyName"))

tryWithSecurityEnabled {
val conf2 = conf.set("spark.master", "k8s://test:12345")
.set("spark.kerberos.principal", testPrincipal)
.set("spark.kerberos.keytab", testKeytab)
val name = ServiceUtils.getShortName(testPrincipal)
val b2 = new SparkProcessBuilder(name, conf2)
val c2 = b2.toString.split(' ')
assert(c2.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$name"))
assert(c2.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$name"))
assert(!c2.contains(s"--proxy-user $name"))
}

// Test no-kubernetes case
val conf3 = KyuubiConf(false)
val b3 = new SparkProcessBuilder(proxyName, conf3)
val c3 = b3.toString.split(' ')
assert(!c3.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$proxyName"))
assert(!c3.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$proxyName"))
}
}

class FakeSparkProcessBuilder(config: KyuubiConf)
Expand Down