From 7e445f72660de2aabd558ae0eb98968b2ab794e8 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Wed, 8 May 2019 15:45:08 +0200 Subject: [PATCH 1/3] [SPARK-27748][SS] Kafka consumer/producer password/token redaction. --- .../sql/kafka010/CachedKafkaProducer.scala | 26 +++-- .../spark/kafka010/KafkaConfigUpdater.scala | 12 ++- .../spark/kafka010/KafkaRedactionUtil.scala | 48 ++++++++++ .../spark/kafka010/KafkaTokenUtil.scala | 10 +- .../kafka010/KafkaConfigUpdaterSuite.scala | 13 +-- .../kafka010/KafkaDelegationTokenTest.scala | 32 +++++++ .../kafka010/KafkaRedactionUtilSuite.scala | 96 +++++++++++++++++++ .../spark/kafka010/KafkaTokenUtilSuite.scala | 44 ++------- 8 files changed, 221 insertions(+), 60 deletions(-) create mode 100644 external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala create mode 100644 external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index ce22e3f2c889..663a28ceb5dd 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -28,7 +28,7 @@ import scala.util.control.NonFatal import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.kafka010.KafkaConfigUpdater +import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil} private[kafka010] object CachedKafkaProducer extends Logging { @@ -42,8 +42,7 @@ private[kafka010] object CachedKafkaProducer extends Logging { private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] { override def load(config: Seq[(String, Object)]): Producer = { - val configMap = config.map(x => x._1 -> x._2).toMap.asJava - createKafkaProducer(configMap) + createKafkaProducer(config) } } @@ -52,8 +51,11 @@ private[kafka010] object CachedKafkaProducer extends Logging { notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = { val paramsSeq: Seq[(String, Object)] = notification.getKey val producer: Producer = notification.getValue - logDebug( - s"Evicting kafka producer $producer params: $paramsSeq, due to ${notification.getCause}") + if (log.isDebugEnabled()) { + val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) + logDebug(s"Evicting kafka producer $producer params: $redactedParamsSeq, " + + s"due to ${notification.getCause}") + } close(paramsSeq, producer) } } @@ -63,9 +65,12 @@ private[kafka010] object CachedKafkaProducer extends Logging { .removalListener(removalListener) .build[Seq[(String, Object)], Producer](cacheLoader) - private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = { - val kafkaProducer: Producer = new Producer(producerConfiguration) - logDebug(s"Created a new instance of KafkaProducer for $producerConfiguration.") + private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = { + val kafkaProducer: Producer = new Producer(paramsSeq.map(x => x._1 -> x._2).toMap.asJava) + if (log.isDebugEnabled()) { + val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) + logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.") + } kafkaProducer } @@ -103,7 +108,10 @@ private[kafka010] object CachedKafkaProducer extends Logging { /** Auto close on cache evict */ private def close(paramsSeq: Seq[(String, Object)], producer: Producer): Unit = { try { - logInfo(s"Closing the KafkaProducer with params: ${paramsSeq.mkString("\n")}.") + if (log.isInfoEnabled()) { + val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) + logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.") + } producer.close() } catch { case NonFatal(e) => logWarning("Error while closing kafka producer.", e) diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala index 38f3b98c6ace..0c61045d6d48 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala @@ -36,14 +36,22 @@ private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[St def set(key: String, value: Object): this.type = { map.put(key, value) - logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}") + if (log.isDebugEnabled()) { + val redactedValue = KafkaRedactionUtil.redactParams(Seq((key, value))).head._2 + val redactedOldValue = KafkaRedactionUtil + .redactParams(Seq((key, kafkaParams.getOrElse(key, "")))).head._2 + logDebug(s"$module: Set $key to $redactedValue, earlier value: $redactedOldValue") + } this } def setIfUnset(key: String, value: Object): this.type = { if (!map.containsKey(key)) { map.put(key, value) - logDebug(s"$module: Set $key to $value") + if (log.isDebugEnabled()) { + val redactedValue = KafkaRedactionUtil.redactParams(Seq((key, value))).head._2 + logDebug(s"$module: Set $key to $redactedValue") + } } this } diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala new file mode 100644 index 000000000000..9e690650eb58 --- /dev/null +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kafka010 + +import org.apache.kafka.common.config.SaslConfigs + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN +import org.apache.spark.util.Utils.{redact, REDACTION_REPLACEMENT_TEXT} + +private[spark] object KafkaRedactionUtil extends Logging { + private[spark] def redactParams(params: Seq[(String, Object)]): Seq[(String, Object)] = { + val redactionPattern = SparkEnv.get.conf.get(SECRET_REDACTION_PATTERN) + params.map { case (key, value) => + if (key.equalsIgnoreCase(SaslConfigs.SASL_JAAS_CONFIG)) { + (key, redactJaasParam(value.asInstanceOf[String])) + } else { + val (_, newValue) = redact(Some(redactionPattern), Seq((key, value.asInstanceOf[String]))) + .head + (key, newValue) + } + } + } + + private[kafka010] def redactJaasParam(param: String): String = { + if (param != null && !param.isEmpty) { + param.replaceAll("password=\".*\"", s"""password="$REDACTION_REPLACEMENT_TEXT"""") + } else { + param + } + } +} diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala index 7078b4fd964e..da21d2e2413d 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala @@ -41,6 +41,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.Utils +import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT private[spark] object KafkaTokenUtil extends Logging { val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") @@ -193,7 +194,7 @@ private[spark] object KafkaTokenUtil extends Logging { | debug=${isGlobalKrbDebugEnabled()} | useTicketCache=true | serviceName="${clusterConf.kerberosServiceName}"; - """.stripMargin.replace("\n", "") + """.stripMargin.replace("\n", "").trim logDebug(s"Krb ticket cache JAAS params: $params") params } @@ -226,7 +227,8 @@ private[spark] object KafkaTokenUtil extends Logging { logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) val tokenInfo = token.tokenInfo - logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( + logDebug("%-15s %-15s %-15s %-25s %-15s %-15s %-15s".format( + REDACTION_REPLACEMENT_TEXT, tokenInfo.tokenId, tokenInfo.owner, tokenInfo.renewersAsString, @@ -268,8 +270,8 @@ private[spark] object KafkaTokenUtil extends Logging { | serviceName="${clusterConf.kerberosServiceName}" | username="$username" | password="$password"; - """.stripMargin.replace("\n", "") - logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}") + """.stripMargin.replace("\n", "").trim + logDebug(s"Scram JAAS params: ${KafkaRedactionUtil.redactJaasParam(params)}") params } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala index 8f4cedfb2f0c..7a172892e778 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala @@ -23,13 +23,10 @@ import org.apache.kafka.common.config.SaslConfigs import org.apache.spark.SparkFunSuite class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTest { - private val identifier = "cluster1" - private val tokenService = KafkaTokenUtil.getTokenService(identifier) private val testModule = "testModule" private val testKey = "testKey" private val testValue = "testValue" private val otherTestValue = "otherTestValue" - private val bootStrapServers = "127.0.0.1:0" test("set should always set value") { val params = Map.empty[String, String] @@ -81,10 +78,10 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTes ) setSparkEnv( Map( - s"spark.kafka.clusters.$identifier.auth.bootstrap.servers" -> bootStrapServers + s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> bootStrapServers ) ) - addTokenToUGI(tokenService) + addTokenToUGI(tokenService1) val updatedParams = KafkaConfigUpdater(testModule, params) .setAuthenticationConfigIfNeeded() @@ -103,11 +100,11 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTes ) setSparkEnv( Map( - s"spark.kafka.clusters.$identifier.auth.bootstrap.servers" -> bootStrapServers, - s"spark.kafka.clusters.$identifier.sasl.token.mechanism" -> "intentionally_invalid" + s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> bootStrapServers, + s"spark.kafka.clusters.$identifier1.sasl.token.mechanism" -> "intentionally_invalid" ) ) - addTokenToUGI(tokenService) + addTokenToUGI(tokenService1) val e = intercept[IllegalArgumentException] { KafkaConfigUpdater(testModule, params) diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala index 2675085bb2cb..74f1cdcf7346 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala @@ -40,6 +40,21 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach { protected val tokenId = "tokenId" + ju.UUID.randomUUID().toString protected val tokenPassword = "tokenPassword" + ju.UUID.randomUUID().toString + protected val identifier1 = "cluster1" + protected val identifier2 = "cluster2" + protected val tokenService1 = KafkaTokenUtil.getTokenService(identifier1) + protected val tokenService2 = KafkaTokenUtil.getTokenService(identifier2) + protected val bootStrapServers = "127.0.0.1:0" + protected val matchingTargetServersRegex = "127.0.0.*:0" + protected val nonMatchingTargetServersRegex = "127.0.intentionally_non_matching.*:0" + protected val trustStoreLocation = "/path/to/trustStore" + protected val trustStorePassword = "trustStoreSecret" + protected val keyStoreLocation = "/path/to/keyStore" + protected val keyStorePassword = "keyStoreSecret" + protected val keyPassword = "keySecret" + protected val keytab = "/path/to/keytab" + protected val principal = "user@domain.com" + private class KafkaJaasConfiguration extends Configuration { val entry = new AppConfigurationEntry( @@ -89,4 +104,21 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach { doReturn(conf).when(env).conf SparkEnv.set(env) } + + protected def createClusterConf( + identifier: String, + securityProtocol: String): KafkaTokenClusterConf = { + KafkaTokenClusterConf( + identifier, + bootStrapServers, + KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX, + securityProtocol, + KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME, + Some(trustStoreLocation), + Some(trustStorePassword), + Some(keyStoreLocation), + Some(keyStorePassword), + Some(keyPassword), + KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM) + } } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala new file mode 100644 index 000000000000..3c006550835b --- /dev/null +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kafka010 + +import java.{util => ju} + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.config.{SaslConfigs, SslConfigs} +import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT + +class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { + test("redactParams should give back empty parameters") { + setSparkEnv(Map()) + assert(KafkaRedactionUtil.redactParams(Seq()) === Seq()) + } + + test("redactParams should redact token password from parameters") { + setSparkEnv(Map()) + val groupId = "id-" + ju.UUID.randomUUID().toString + addTokenToUGI(tokenService1) + val clusterConf = createClusterConf(identifier1, SASL_SSL.name) + val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf) + val kafkaParams = Seq( + ConsumerConfig.GROUP_ID_CONFIG -> groupId, + SaslConfigs.SASL_JAAS_CONFIG -> jaasParams + ) + + val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap + + assert(redactedParams.get(ConsumerConfig.GROUP_ID_CONFIG).get.asInstanceOf[String] + === groupId) + val redactedJaasParams = redactedParams.get(SaslConfigs.SASL_JAAS_CONFIG).get + .asInstanceOf[String] + assert(redactedJaasParams.contains(tokenId)) + assert(!redactedJaasParams.contains(tokenPassword)) + } + + test("redactParams should redact passwords from parameters") { + setSparkEnv(Map()) + val groupId = "id-" + ju.UUID.randomUUID().toString + val kafkaParams = Seq( + ConsumerConfig.GROUP_ID_CONFIG -> groupId, + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> trustStorePassword, + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> keyStorePassword, + SslConfigs.SSL_KEY_PASSWORD_CONFIG -> keyPassword + ) + + val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap + + assert(redactedParams(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + === groupId) + assert(redactedParams(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).asInstanceOf[String] + === REDACTION_REPLACEMENT_TEXT) + assert(redactedParams(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[String] + === REDACTION_REPLACEMENT_TEXT) + assert(redactedParams(SslConfigs.SSL_KEY_PASSWORD_CONFIG).asInstanceOf[String] + === REDACTION_REPLACEMENT_TEXT) + } + + test("redactJaasParam should give back null") { + assert(KafkaRedactionUtil.redactJaasParam(null) === null) + } + + test("redactJaasParam should give back empty string") { + assert(KafkaRedactionUtil.redactJaasParam("") === "") + } + + test("redactJaasParam should redact token password") { + addTokenToUGI(tokenService1) + val clusterConf = createClusterConf(identifier1, SASL_SSL.name) + val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf) + + val redactedJaasParams = KafkaRedactionUtil.redactJaasParam(jaasParams) + + assert(redactedJaasParams.contains(tokenId)) + assert(!redactedJaasParams.contains(tokenPassword)) + } +} diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala index 11f954b4b2f8..bcca920eed4e 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala @@ -29,21 +29,6 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config._ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { - private val identifier1 = "cluster1" - private val identifier2 = "cluster2" - private val tokenService1 = KafkaTokenUtil.getTokenService(identifier1) - private val tokenService2 = KafkaTokenUtil.getTokenService(identifier2) - private val bootStrapServers = "127.0.0.1:0" - private val matchingTargetServersRegex = "127.0.0.*:0" - private val nonMatchingTargetServersRegex = "127.0.1.*:0" - private val trustStoreLocation = "/path/to/trustStore" - private val trustStorePassword = "trustStoreSecret" - private val keyStoreLocation = "/path/to/keyStore" - private val keyStorePassword = "keyStoreSecret" - private val keyPassword = "keySecret" - private val keytab = "/path/to/keytab" - private val principal = "user@domain.com" - private var sparkConf: SparkConf = null override def beforeEach(): Unit = { @@ -68,7 +53,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { test("createAdminClientProperties with SASL_PLAINTEXT protocol should not include " + "keystore and truststore config") { - val clusterConf = createClusterConf(SASL_PLAINTEXT.name) + val clusterConf = createClusterConf(identifier1, SASL_PLAINTEXT.name) val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) @@ -84,7 +69,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { } test("createAdminClientProperties with SASL_SSL protocol should include truststore config") { - val clusterConf = createClusterConf(SASL_SSL.name) + val clusterConf = createClusterConf(identifier1, SASL_SSL.name) val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) @@ -103,7 +88,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { test("createAdminClientProperties with SSL protocol should include keystore and truststore " + "config") { - val clusterConf = createClusterConf(SSL.name) + val clusterConf = createClusterConf(identifier1, SSL.name) val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) @@ -121,7 +106,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { } test("createAdminClientProperties with global config should not set dynamic jaas config") { - val clusterConf = createClusterConf(SASL_SSL.name) + val clusterConf = createClusterConf(identifier1, SASL_SSL.name) setGlobalKafkaClientConfig() val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) @@ -137,7 +122,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { test("createAdminClientProperties with keytab should set keytab dynamic jaas config") { sparkConf.set(KEYTAB, keytab) sparkConf.set(PRINCIPAL, principal) - val clusterConf = createClusterConf(SASL_SSL.name) + val clusterConf = createClusterConf(identifier1, SASL_SSL.name) val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) @@ -155,7 +140,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { } test("createAdminClientProperties without keytab should set ticket cache dynamic jaas config") { - val clusterConf = createClusterConf(SASL_SSL.name) + val clusterConf = createClusterConf(identifier1, SASL_SSL.name) val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) @@ -225,7 +210,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { test("getTokenJaasParams with token should return scram module") { addTokenToUGI(tokenService1) - val clusterConf = createClusterConf(SASL_SSL.name) + val clusterConf = createClusterConf(identifier1, SASL_SSL.name) val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf) @@ -234,19 +219,4 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { assert(jaasParams.contains(tokenId)) assert(jaasParams.contains(tokenPassword)) } - - private def createClusterConf(securityProtocol: String): KafkaTokenClusterConf = { - KafkaTokenClusterConf( - identifier1, - bootStrapServers, - KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX, - securityProtocol, - KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME, - Some(trustStoreLocation), - Some(trustStorePassword), - Some(keyStoreLocation), - Some(keyStorePassword), - Some(keyPassword), - KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM) - } } From c59163274443376871593680f97578fa77275e18 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 30 May 2019 19:35:37 +0200 Subject: [PATCH 2/3] Fix --- .../spark/sql/kafka010/CachedKafkaProducer.scala | 2 +- .../apache/spark/kafka010/KafkaRedactionUtil.scala | 13 +++++++++---- .../spark/kafka010/KafkaRedactionUtilSuite.scala | 10 ++++++++++ 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 663a28ceb5dd..fc177cdc9037 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -66,7 +66,7 @@ private[kafka010] object CachedKafkaProducer extends Logging { .build[Seq[(String, Object)], Producer](cacheLoader) private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = { - val kafkaProducer: Producer = new Producer(paramsSeq.map(x => x._1 -> x._2).toMap.asJava) + val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava) if (log.isDebugEnabled()) { val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.") diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala index 9e690650eb58..fb3a235d1cb2 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala @@ -26,14 +26,19 @@ import org.apache.spark.util.Utils.{redact, REDACTION_REPLACEMENT_TEXT} private[spark] object KafkaRedactionUtil extends Logging { private[spark] def redactParams(params: Seq[(String, Object)]): Seq[(String, Object)] = { - val redactionPattern = SparkEnv.get.conf.get(SECRET_REDACTION_PATTERN) + val redactionPattern = Some(SparkEnv.get.conf.get(SECRET_REDACTION_PATTERN)) params.map { case (key, value) => if (key.equalsIgnoreCase(SaslConfigs.SASL_JAAS_CONFIG)) { (key, redactJaasParam(value.asInstanceOf[String])) } else { - val (_, newValue) = redact(Some(redactionPattern), Seq((key, value.asInstanceOf[String]))) - .head - (key, newValue) + value match { + case s: String => + val (_, newValue) = redact(redactionPattern, Seq((key, s))).head + (key, newValue) + + case _ => + (key, value) + } } } } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala index 3c006550835b..98acf3d9cdcc 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala @@ -22,6 +22,7 @@ import java.{util => ju} import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.config.{SaslConfigs, SslConfigs} import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL +import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkFunSuite import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT @@ -32,6 +33,15 @@ class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTes assert(KafkaRedactionUtil.redactParams(Seq()) === Seq()) } + test("redactParams should give back non String parameters") { + setSparkEnv(Map()) + val kafkaParams = Seq( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] + ) + + assert(KafkaRedactionUtil.redactParams(kafkaParams) === kafkaParams) + } + test("redactParams should redact token password from parameters") { setSparkEnv(Map()) val groupId = "id-" + ju.UUID.randomUUID().toString From 67aa337ad48e393a2e19f60fc29bca559dd2380f Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 3 Jun 2019 10:22:07 +0200 Subject: [PATCH 3/3] Redact non String params as well + added size assertion --- .../spark/kafka010/KafkaRedactionUtil.scala | 20 ++++---- .../kafka010/KafkaRedactionUtilSuite.scala | 48 ++++++++++++------- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala index fb3a235d1cb2..daf2d9adff3b 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala @@ -25,20 +25,18 @@ import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN import org.apache.spark.util.Utils.{redact, REDACTION_REPLACEMENT_TEXT} private[spark] object KafkaRedactionUtil extends Logging { - private[spark] def redactParams(params: Seq[(String, Object)]): Seq[(String, Object)] = { + private[spark] def redactParams(params: Seq[(String, Object)]): Seq[(String, String)] = { val redactionPattern = Some(SparkEnv.get.conf.get(SECRET_REDACTION_PATTERN)) params.map { case (key, value) => - if (key.equalsIgnoreCase(SaslConfigs.SASL_JAAS_CONFIG)) { - (key, redactJaasParam(value.asInstanceOf[String])) - } else { - value match { - case s: String => - val (_, newValue) = redact(redactionPattern, Seq((key, s))).head - (key, newValue) - - case _ => - (key, value) + if (value != null) { + if (key.equalsIgnoreCase(SaslConfigs.SASL_JAAS_CONFIG)) { + (key, redactJaasParam(value.asInstanceOf[String])) + } else { + val (_, newValue) = redact(redactionPattern, Seq((key, value.toString))).head + (key, newValue) } + } else { + (key, value.asInstanceOf[String]) } } } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala index 98acf3d9cdcc..c8b0dac7b213 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala @@ -25,25 +25,43 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { test("redactParams should give back empty parameters") { - setSparkEnv(Map()) + setSparkEnv(Map.empty) assert(KafkaRedactionUtil.redactParams(Seq()) === Seq()) } - test("redactParams should give back non String parameters") { - setSparkEnv(Map()) + test("redactParams should give back null value") { + setSparkEnv(Map.empty) val kafkaParams = Seq( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> null ) assert(KafkaRedactionUtil.redactParams(kafkaParams) === kafkaParams) } + test("redactParams should redact non String parameters") { + setSparkEnv( + Map( + SECRET_REDACTION_PATTERN.key -> ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG + ) + ) + val kafkaParams = Seq( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] + ) + + val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap + + assert(redactedParams.size === 1) + assert(redactedParams.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).get + === REDACTION_REPLACEMENT_TEXT) + } + test("redactParams should redact token password from parameters") { - setSparkEnv(Map()) + setSparkEnv(Map.empty) val groupId = "id-" + ju.UUID.randomUUID().toString addTokenToUGI(tokenService1) val clusterConf = createClusterConf(identifier1, SASL_SSL.name) @@ -55,16 +73,15 @@ class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTes val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap - assert(redactedParams.get(ConsumerConfig.GROUP_ID_CONFIG).get.asInstanceOf[String] - === groupId) + assert(redactedParams.size === 2) + assert(redactedParams.get(ConsumerConfig.GROUP_ID_CONFIG).get === groupId) val redactedJaasParams = redactedParams.get(SaslConfigs.SASL_JAAS_CONFIG).get - .asInstanceOf[String] assert(redactedJaasParams.contains(tokenId)) assert(!redactedJaasParams.contains(tokenPassword)) } test("redactParams should redact passwords from parameters") { - setSparkEnv(Map()) + setSparkEnv(Map.empty) val groupId = "id-" + ju.UUID.randomUUID().toString val kafkaParams = Seq( ConsumerConfig.GROUP_ID_CONFIG -> groupId, @@ -75,14 +92,11 @@ class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTes val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap - assert(redactedParams(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - === groupId) - assert(redactedParams(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).asInstanceOf[String] - === REDACTION_REPLACEMENT_TEXT) - assert(redactedParams(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[String] - === REDACTION_REPLACEMENT_TEXT) - assert(redactedParams(SslConfigs.SSL_KEY_PASSWORD_CONFIG).asInstanceOf[String] - === REDACTION_REPLACEMENT_TEXT) + assert(redactedParams.size === 4) + assert(redactedParams(ConsumerConfig.GROUP_ID_CONFIG) === groupId) + assert(redactedParams(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT) + assert(redactedParams(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT) + assert(redactedParams(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT) } test("redactJaasParam should give back null") {