Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.util.control.NonFatal

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil}

private[kafka010] object CachedKafkaProducer extends Logging {

Expand All @@ -42,8 +42,7 @@ private[kafka010] object CachedKafkaProducer extends Logging {

private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
override def load(config: Seq[(String, Object)]): Producer = {
val configMap = config.map(x => x._1 -> x._2).toMap.asJava
createKafkaProducer(configMap)
createKafkaProducer(config)
}
}

Expand All @@ -52,8 +51,11 @@ private[kafka010] object CachedKafkaProducer extends Logging {
notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = {
val paramsSeq: Seq[(String, Object)] = notification.getKey
val producer: Producer = notification.getValue
logDebug(
s"Evicting kafka producer $producer params: $paramsSeq, due to ${notification.getCause}")
if (log.isDebugEnabled()) {
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
logDebug(s"Evicting kafka producer $producer params: $redactedParamsSeq, " +
s"due to ${notification.getCause}")
}
close(paramsSeq, producer)
}
}
Expand All @@ -63,9 +65,12 @@ private[kafka010] object CachedKafkaProducer extends Logging {
.removalListener(removalListener)
.build[Seq[(String, Object)], Producer](cacheLoader)

private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = {
val kafkaProducer: Producer = new Producer(producerConfiguration)
logDebug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = {
val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
if (log.isDebugEnabled()) {
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.")
}
kafkaProducer
}

Expand Down Expand Up @@ -103,7 +108,10 @@ private[kafka010] object CachedKafkaProducer extends Logging {
/** Auto close on cache evict */
private def close(paramsSeq: Seq[(String, Object)], producer: Producer): Unit = {
try {
logInfo(s"Closing the KafkaProducer with params: ${paramsSeq.mkString("\n")}.")
if (log.isInfoEnabled()) {
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.")
}
producer.close()
} catch {
case NonFatal(e) => logWarning("Error while closing kafka producer.", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@ private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[St

def set(key: String, value: Object): this.type = {
map.put(key, value)
logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
if (log.isDebugEnabled()) {
val redactedValue = KafkaRedactionUtil.redactParams(Seq((key, value))).head._2
val redactedOldValue = KafkaRedactionUtil
.redactParams(Seq((key, kafkaParams.getOrElse(key, "")))).head._2
logDebug(s"$module: Set $key to $redactedValue, earlier value: $redactedOldValue")
}
this
}

def setIfUnset(key: String, value: Object): this.type = {
if (!map.containsKey(key)) {
map.put(key, value)
logDebug(s"$module: Set $key to $value")
if (log.isDebugEnabled()) {
val redactedValue = KafkaRedactionUtil.redactParams(Seq((key, value))).head._2
logDebug(s"$module: Set $key to $redactedValue")
}
}
this
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kafka010

import org.apache.kafka.common.config.SaslConfigs

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN
import org.apache.spark.util.Utils.{redact, REDACTION_REPLACEMENT_TEXT}

private[spark] object KafkaRedactionUtil extends Logging {
private[spark] def redactParams(params: Seq[(String, Object)]): Seq[(String, String)] = {
val redactionPattern = Some(SparkEnv.get.conf.get(SECRET_REDACTION_PATTERN))
params.map { case (key, value) =>
if (value != null) {
if (key.equalsIgnoreCase(SaslConfigs.SASL_JAAS_CONFIG)) {
(key, redactJaasParam(value.asInstanceOf[String]))
} else {
val (_, newValue) = redact(redactionPattern, Seq((key, value.toString))).head
(key, newValue)
}
} else {
(key, value.asInstanceOf[String])
}
}
}

private[kafka010] def redactJaasParam(param: String): String = {
if (param != null && !param.isEmpty) {
param.replaceAll("password=\".*\"", s"""password="$REDACTION_REPLACEMENT_TEXT"""")
} else {
param
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT

private[spark] object KafkaTokenUtil extends Logging {
val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
Expand Down Expand Up @@ -193,7 +194,7 @@ private[spark] object KafkaTokenUtil extends Logging {
| debug=${isGlobalKrbDebugEnabled()}
| useTicketCache=true
| serviceName="${clusterConf.kerberosServiceName}";
""".stripMargin.replace("\n", "")
""".stripMargin.replace("\n", "").trim
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated fix. There were additional spaces at the end of the generated string. This change removes it. Worth to mention the code works both way but thought it's just ugly.

logDebug(s"Krb ticket cache JAAS params: $params")
params
}
Expand Down Expand Up @@ -226,7 +227,8 @@ private[spark] object KafkaTokenUtil extends Logging {
logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
val tokenInfo = token.tokenInfo
logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
logDebug("%-15s %-15s %-15s %-25s %-15s %-15s %-15s".format(
REDACTION_REPLACEMENT_TEXT,
tokenInfo.tokenId,
tokenInfo.owner,
tokenInfo.renewersAsString,
Expand Down Expand Up @@ -268,8 +270,8 @@ private[spark] object KafkaTokenUtil extends Logging {
| serviceName="${clusterConf.kerberosServiceName}"
| username="$username"
| password="$password";
""".stripMargin.replace("\n", "")
logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")
""".stripMargin.replace("\n", "").trim
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated fix. There were additional spaces at the end of the generated string. This change removes it. Worth to mention the code works both way but thought it's just ugly.

logDebug(s"Scram JAAS params: ${KafkaRedactionUtil.redactJaasParam(params)}")

params
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ import org.apache.kafka.common.config.SaslConfigs
import org.apache.spark.SparkFunSuite

class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTest {
private val identifier = "cluster1"
private val tokenService = KafkaTokenUtil.getTokenService(identifier)
private val testModule = "testModule"
private val testKey = "testKey"
private val testValue = "testValue"
private val otherTestValue = "otherTestValue"
private val bootStrapServers = "127.0.0.1:0"

test("set should always set value") {
val params = Map.empty[String, String]
Expand Down Expand Up @@ -81,10 +78,10 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTes
)
setSparkEnv(
Map(
s"spark.kafka.clusters.$identifier.auth.bootstrap.servers" -> bootStrapServers
s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> bootStrapServers
)
)
addTokenToUGI(tokenService)
addTokenToUGI(tokenService1)

val updatedParams = KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
Expand All @@ -103,11 +100,11 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTes
)
setSparkEnv(
Map(
s"spark.kafka.clusters.$identifier.auth.bootstrap.servers" -> bootStrapServers,
s"spark.kafka.clusters.$identifier.sasl.token.mechanism" -> "intentionally_invalid"
s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> bootStrapServers,
s"spark.kafka.clusters.$identifier1.sasl.token.mechanism" -> "intentionally_invalid"
)
)
addTokenToUGI(tokenService)
addTokenToUGI(tokenService1)

val e = intercept[IllegalArgumentException] {
KafkaConfigUpdater(testModule, params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
protected val tokenId = "tokenId" + ju.UUID.randomUUID().toString
protected val tokenPassword = "tokenPassword" + ju.UUID.randomUUID().toString

protected val identifier1 = "cluster1"
protected val identifier2 = "cluster2"
protected val tokenService1 = KafkaTokenUtil.getTokenService(identifier1)
protected val tokenService2 = KafkaTokenUtil.getTokenService(identifier2)
protected val bootStrapServers = "127.0.0.1:0"
protected val matchingTargetServersRegex = "127.0.0.*:0"
protected val nonMatchingTargetServersRegex = "127.0.intentionally_non_matching.*:0"
protected val trustStoreLocation = "/path/to/trustStore"
protected val trustStorePassword = "trustStoreSecret"
protected val keyStoreLocation = "/path/to/keyStore"
protected val keyStorePassword = "keyStoreSecret"
protected val keyPassword = "keySecret"
protected val keytab = "/path/to/keytab"
protected val principal = "[email protected]"

private class KafkaJaasConfiguration extends Configuration {
val entry =
new AppConfigurationEntry(
Expand Down Expand Up @@ -89,4 +104,21 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
doReturn(conf).when(env).conf
SparkEnv.set(env)
}

protected def createClusterConf(
identifier: String,
securityProtocol: String): KafkaTokenClusterConf = {
KafkaTokenClusterConf(
identifier,
bootStrapServers,
KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX,
securityProtocol,
KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME,
Some(trustStoreLocation),
Some(trustStorePassword),
Some(keyStoreLocation),
Some(keyStorePassword),
Some(keyPassword),
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kafka010

import java.{util => ju}

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL
import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN
import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT

class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
test("redactParams should give back empty parameters") {
setSparkEnv(Map.empty)
assert(KafkaRedactionUtil.redactParams(Seq()) === Seq())
}

test("redactParams should give back null value") {
setSparkEnv(Map.empty)
val kafkaParams = Seq(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> null
)

assert(KafkaRedactionUtil.redactParams(kafkaParams) === kafkaParams)
}

test("redactParams should redact non String parameters") {
setSparkEnv(
Map(
SECRET_REDACTION_PATTERN.key -> ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
)
)
val kafkaParams = Seq(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)

val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap

assert(redactedParams.size === 1)
assert(redactedParams.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).get
=== REDACTION_REPLACEMENT_TEXT)
}

test("redactParams should redact token password from parameters") {
setSparkEnv(Map.empty)
val groupId = "id-" + ju.UUID.randomUUID().toString
addTokenToUGI(tokenService1)
val clusterConf = createClusterConf(identifier1, SASL_SSL.name)
val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf)
val kafkaParams = Seq(
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
SaslConfigs.SASL_JAAS_CONFIG -> jaasParams
)

val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap

assert(redactedParams.size === 2)
assert(redactedParams.get(ConsumerConfig.GROUP_ID_CONFIG).get === groupId)
val redactedJaasParams = redactedParams.get(SaslConfigs.SASL_JAAS_CONFIG).get
assert(redactedJaasParams.contains(tokenId))
assert(!redactedJaasParams.contains(tokenPassword))
}

test("redactParams should redact passwords from parameters") {
setSparkEnv(Map.empty)
val groupId = "id-" + ju.UUID.randomUUID().toString
val kafkaParams = Seq(
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> trustStorePassword,
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> keyStorePassword,
SslConfigs.SSL_KEY_PASSWORD_CONFIG -> keyPassword
)

val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap

assert(redactedParams.size === 4)
assert(redactedParams(ConsumerConfig.GROUP_ID_CONFIG) === groupId)
assert(redactedParams(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT)
assert(redactedParams(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT)
assert(redactedParams(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT)
}

test("redactJaasParam should give back null") {
assert(KafkaRedactionUtil.redactJaasParam(null) === null)
}

test("redactJaasParam should give back empty string") {
assert(KafkaRedactionUtil.redactJaasParam("") === "")
}

test("redactJaasParam should redact token password") {
addTokenToUGI(tokenService1)
val clusterConf = createClusterConf(identifier1, SASL_SSL.name)
val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf)

val redactedJaasParams = KafkaRedactionUtil.redactJaasParam(jaasParams)

assert(redactedJaasParams.contains(tokenId))
assert(!redactedJaasParams.contains(tokenPassword))
}
}
Loading