From 48b9b225c687f673102ae97857ecb638e66b11ea Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Tue, 20 Sep 2022 16:52:14 +0800 Subject: [PATCH 1/3] add --- .../engine/spark/SparkProcessBuilder.scala | 27 ++++++++++++++----- .../spark/SparkProcessBuilderSuite.scala | 20 ++++++++++++++ 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 85ee8349152..eb2daa742f5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -91,8 +91,20 @@ class SparkProcessBuilder( buffer += s"${convertConfigKey(k)}=$v" } + // For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user + def setSparkUserName(userName: String): Unit = { + buffer += CONF + buffer += s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$userName" + buffer += CONF + buffer += s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$userName" + } + // iff the keytab is specified, PROXY_USER is not supported - if (!useKeytab()) { + val shortUserName = useKeytab() + if (shortUserName.nonEmpty) { + setSparkUserName(shortUserName.get) + } else { + setSparkUserName(proxyUser) buffer += PROXY_USER buffer += proxyUser } @@ -104,26 +116,27 @@ class SparkProcessBuilder( override protected def module: String = "kyuubi-spark-sql-engine" - private def useKeytab(): Boolean = { + private def useKeytab(): Option[String] = { val principal = conf.getOption(PRINCIPAL) val keytab = conf.getOption(KEYTAB) if (principal.isEmpty || keytab.isEmpty) { - false + None } else { try { val ugi = UserGroupInformation .loginUserFromKeytabAndReturnUGI(principal.get, keytab.get) - val keytabEnabled = ugi.getShortUserName == proxyUser - if (!keytabEnabled) { + if (ugi.getShortUserName != proxyUser) { warn(s"The session proxy user: $proxyUser is not same with " + s"spark principal: ${ugi.getShortUserName}, so we can't support use keytab. " + s"Fallback to use proxy user.") + None + } else { + Some(ugi.getShortUserName) } - keytabEnabled } catch { case e: IOException => error(s"Failed to login for ${principal.get}", e) - false + None } } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index b6a6b7c2ef3..bd1b2815448 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -270,6 +270,26 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { assert(!pb.toString.contains(engineRefId2)) assert(pb.toString.contains(engineRefId)) } + + test("SparkProcessBuilder build spark engine with SPARK_USER_NAME") { + val proxyName = "kyuubi" + val conf1 = KyuubiConf(false) + val b1 = new SparkProcessBuilder(proxyName, conf1) + val c1 = b1.toString.split(' ') + assert(c1.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$proxyName")) + assert(c1.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$proxyName")) + + tryWithSecurityEnabled { + val conf2 = conf.set("spark.kerberos.principal", testPrincipal) + .set("spark.kerberos.keytab", testKeytab) + val name = ServiceUtils.getShortName(testPrincipal) + val b2 = new SparkProcessBuilder(name, conf2) + val c2 = b2.toString.split(' ') + assert(c2.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$name")) + assert(c2.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$name")) + assert(!c2.contains(s"--proxy-user $name")) + } + } } class FakeSparkProcessBuilder(config: KyuubiConf) From ddd713fa9039f9a30b83d6e533496d7b6d0ded1d Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Tue, 20 Sep 2022 17:12:46 +0800 Subject: [PATCH 2/3] fix --- .../engine/spark/SparkProcessBuilder.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index eb2daa742f5..aeb06babac9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -99,14 +99,14 @@ class SparkProcessBuilder( buffer += s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$userName" } - // iff the keytab is specified, PROXY_USER is not supported - val shortUserName = useKeytab() - if (shortUserName.nonEmpty) { - setSparkUserName(shortUserName.get) - } else { - setSparkUserName(proxyUser) - buffer += PROXY_USER - buffer += proxyUser + // if the keytab is specified, PROXY_USER is not supported + tryKeytab() match { + case None => + setSparkUserName(proxyUser) + buffer += PROXY_USER + buffer += proxyUser + case Some(name) => + setSparkUserName(name) } mainResource.foreach { r => buffer += r } @@ -116,7 +116,7 @@ class SparkProcessBuilder( override protected def module: String = "kyuubi-spark-sql-engine" - private def useKeytab(): Option[String] = { + private def tryKeytab(): Option[String] = { val principal = conf.getOption(PRINCIPAL) val keytab = conf.getOption(KEYTAB) if (principal.isEmpty || keytab.isEmpty) { From 9596372cea94093fba5d07af05835c5e041dba73 Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Wed, 21 Sep 2022 09:49:58 +0800 Subject: [PATCH 3/3] only k8s case --- .../kyuubi/engine/spark/SparkProcessBuilder.scala | 12 ++++++++---- .../engine/spark/SparkProcessBuilderSuite.scala | 12 ++++++++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index aeb06babac9..3b5703bdd4c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -93,10 +93,14 @@ class SparkProcessBuilder( // For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user def setSparkUserName(userName: String): Unit = { - buffer += CONF - buffer += s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$userName" - buffer += CONF - buffer += s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$userName" + clusterManager().foreach(cm => { + if (cm.startsWith("k8s://")) { + buffer += CONF + buffer += s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$userName" + buffer += CONF + buffer += s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$userName" + } + }) } // if the keytab is specified, PROXY_USER is not supported diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index bd1b2815448..550940fc5f8 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -273,14 +273,15 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { test("SparkProcessBuilder build spark engine with SPARK_USER_NAME") { val proxyName = "kyuubi" - val conf1 = KyuubiConf(false) + val conf1 = KyuubiConf(false).set("spark.master", "k8s://test:12345") val b1 = new SparkProcessBuilder(proxyName, conf1) val c1 = b1.toString.split(' ') assert(c1.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$proxyName")) assert(c1.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$proxyName")) tryWithSecurityEnabled { - val conf2 = conf.set("spark.kerberos.principal", testPrincipal) + val conf2 = conf.set("spark.master", "k8s://test:12345") + .set("spark.kerberos.principal", testPrincipal) .set("spark.kerberos.keytab", testKeytab) val name = ServiceUtils.getShortName(testPrincipal) val b2 = new SparkProcessBuilder(name, conf2) @@ -289,6 +290,13 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { assert(c2.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$name")) assert(!c2.contains(s"--proxy-user $name")) } + + // Test no-kubernetes case + val conf3 = KyuubiConf(false) + val b3 = new SparkProcessBuilder(proxyName, conf3) + val c3 = b3.toString.split(' ') + assert(!c3.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$proxyName")) + assert(!c3.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$proxyName")) } }