diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala index c890cee59ffe0..aec0f72feb3c1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala @@ -143,14 +143,11 @@ private[spark] object KafkaTokenUtil extends Logging { } private[security] def getKeytabJaasParams(sparkConf: SparkConf): String = { - val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME) - require(serviceName.nonEmpty, "Kerberos service name must be defined") - val params = s""" |${getKrb5LoginModuleName} required | useKeyTab=true - | serviceName="${serviceName.get}" + | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}" | keyTab="${sparkConf.get(KEYTAB).get}" | principal="${sparkConf.get(PRINCIPAL).get}"; """.stripMargin.replace("\n", "") @@ -166,7 +163,7 @@ private[spark] object KafkaTokenUtil extends Logging { s""" |${getKrb5LoginModuleName} required | useTicketCache=true - | serviceName="${serviceName.get}"; + | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"; """.stripMargin.replace("\n", "") logDebug(s"Krb ticket cache JAAS params: $params") params diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala index 85d74c27142ad..064fc93cb8ed8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala @@ -40,7 +40,7 @@ private[spark] object Kafka { "Kafka's JAAS config or in Kafka's config. For further details please see kafka " + "documentation. Only used to obtain delegation token.") .stringConf - .createOptional + .createWithDefault("kafka") val TRUSTSTORE_LOCATION = ConfigBuilder("spark.kafka.ssl.truststore.location") diff --git a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala index 682bebde916fa..18aa537b3a51d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala @@ -36,7 +36,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { private val keyStorePassword = "keyStoreSecret" private val keyPassword = "keySecret" private val keytab = "/path/to/keytab" - private val kerberosServiceName = "kafka" private val principal = "user@domain.com" private var sparkConf: SparkConf = null @@ -96,7 +95,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation) sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword) sparkConf.set(Kafka.KEY_PASSWORD, keyPassword) - sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) @@ -119,7 +117,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation) sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword) sparkConf.set(Kafka.KEY_PASSWORD, keyPassword) - sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) @@ -143,7 +140,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation) sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword) sparkConf.set(Kafka.KEY_PASSWORD, keyPassword) - sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) @@ -177,7 +173,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name) sparkConf.set(KEYTAB, keytab) - sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) sparkConf.set(PRINCIPAL, principal) val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) @@ -195,7 +190,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { test("createAdminClientProperties without keytab should set ticket cache dynamic jaas config") { sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name) - sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) @@ -218,22 +212,4 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided) } - - test("getKeytabJaasParams with keytab no service should throw exception") { - sparkConf.set(KEYTAB, keytab) - - val thrown = intercept[IllegalArgumentException] { - KafkaTokenUtil.getKeytabJaasParams(sparkConf) - } - - assert(thrown.getMessage contains "Kerberos service name must be defined") - } - - test("getTicketCacheJaasParams without service should throw exception") { - val thrown = intercept[IllegalArgumentException] { - KafkaTokenUtil.getTicketCacheJaasParams(sparkConf) - } - - assert(thrown.getMessage contains "Kerberos service name must be defined") - } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala index 74d5ef9c05f14..7215295b10091 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.kafka010 import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.kafka.common.security.scram.ScramLoginModule import org.apache.spark.SparkConf @@ -35,8 +34,6 @@ private[kafka010] object KafkaSecurityHelper extends Logging { def getTokenJaasParams(sparkConf: SparkConf): String = { val token = UserGroupInformation.getCurrentUser().getCredentials.getToken( KafkaTokenUtil.TOKEN_SERVICE) - val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME) - require(serviceName.isDefined, "Kerberos service name must be defined") val username = new String(token.getIdentifier) val password = new String(token.getPassword) @@ -45,7 +42,7 @@ private[kafka010] object KafkaSecurityHelper extends Logging { s""" |$loginModuleName required | tokenauth=true - | serviceName="${serviceName.get}" + | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}" | username="$username" | password="$password"; """.stripMargin.replace("\n", "") diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala index 772fe4614bad0..fd9dee390d185 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala @@ -26,12 +26,8 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.security.KafkaTokenUtil import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier -import org.apache.spark.internal.config._ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { - private val keytab = "/path/to/keytab" - private val kerberosServiceName = "kafka" - private val principal = "user@domain.com" private val tokenId = "tokenId" + UUID.randomUUID().toString private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString @@ -76,19 +72,8 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { assert(KafkaSecurityHelper.isTokenAvailable()) } - test("getTokenJaasParams with token no service should throw exception") { - addTokenToUGI() - - val thrown = intercept[IllegalArgumentException] { - KafkaSecurityHelper.getTokenJaasParams(sparkConf) - } - - assert(thrown.getMessage contains "Kerberos service name must be defined") - } - test("getTokenJaasParams with token should return scram module") { addTokenToUGI() - sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)