Skip to content

Commit 67aa337

Browse files
committed
Redact non String params as well + added size assertion
1 parent c591632 commit 67aa337

File tree

2 files changed

+40
-28
lines changed

2 files changed

+40
-28
lines changed

external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,18 @@ import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN
2525
import org.apache.spark.util.Utils.{redact, REDACTION_REPLACEMENT_TEXT}
2626

2727
private[spark] object KafkaRedactionUtil extends Logging {
28-
private[spark] def redactParams(params: Seq[(String, Object)]): Seq[(String, Object)] = {
28+
private[spark] def redactParams(params: Seq[(String, Object)]): Seq[(String, String)] = {
2929
val redactionPattern = Some(SparkEnv.get.conf.get(SECRET_REDACTION_PATTERN))
3030
params.map { case (key, value) =>
31-
if (key.equalsIgnoreCase(SaslConfigs.SASL_JAAS_CONFIG)) {
32-
(key, redactJaasParam(value.asInstanceOf[String]))
33-
} else {
34-
value match {
35-
case s: String =>
36-
val (_, newValue) = redact(redactionPattern, Seq((key, s))).head
37-
(key, newValue)
38-
39-
case _ =>
40-
(key, value)
31+
if (value != null) {
32+
if (key.equalsIgnoreCase(SaslConfigs.SASL_JAAS_CONFIG)) {
33+
(key, redactJaasParam(value.asInstanceOf[String]))
34+
} else {
35+
val (_, newValue) = redact(redactionPattern, Seq((key, value.toString))).head
36+
(key, newValue)
4137
}
38+
} else {
39+
(key, value.asInstanceOf[String])
4240
}
4341
}
4442
}

external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,43 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL
2525
import org.apache.kafka.common.serialization.StringDeserializer
2626

2727
import org.apache.spark.SparkFunSuite
28+
import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN
2829
import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
2930

3031
class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
3132
test("redactParams should give back empty parameters") {
32-
setSparkEnv(Map())
33+
setSparkEnv(Map.empty)
3334
assert(KafkaRedactionUtil.redactParams(Seq()) === Seq())
3435
}
3536

36-
test("redactParams should give back non String parameters") {
37-
setSparkEnv(Map())
37+
test("redactParams should give back null value") {
38+
setSparkEnv(Map.empty)
3839
val kafkaParams = Seq(
39-
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
40+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> null
4041
)
4142

4243
assert(KafkaRedactionUtil.redactParams(kafkaParams) === kafkaParams)
4344
}
4445

46+
test("redactParams should redact non String parameters") {
47+
setSparkEnv(
48+
Map(
49+
SECRET_REDACTION_PATTERN.key -> ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
50+
)
51+
)
52+
val kafkaParams = Seq(
53+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
54+
)
55+
56+
val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap
57+
58+
assert(redactedParams.size === 1)
59+
assert(redactedParams.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).get
60+
=== REDACTION_REPLACEMENT_TEXT)
61+
}
62+
4563
test("redactParams should redact token password from parameters") {
46-
setSparkEnv(Map())
64+
setSparkEnv(Map.empty)
4765
val groupId = "id-" + ju.UUID.randomUUID().toString
4866
addTokenToUGI(tokenService1)
4967
val clusterConf = createClusterConf(identifier1, SASL_SSL.name)
@@ -55,16 +73,15 @@ class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTes
5573

5674
val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap
5775

58-
assert(redactedParams.get(ConsumerConfig.GROUP_ID_CONFIG).get.asInstanceOf[String]
59-
=== groupId)
76+
assert(redactedParams.size === 2)
77+
assert(redactedParams.get(ConsumerConfig.GROUP_ID_CONFIG).get === groupId)
6078
val redactedJaasParams = redactedParams.get(SaslConfigs.SASL_JAAS_CONFIG).get
61-
.asInstanceOf[String]
6279
assert(redactedJaasParams.contains(tokenId))
6380
assert(!redactedJaasParams.contains(tokenPassword))
6481
}
6582

6683
test("redactParams should redact passwords from parameters") {
67-
setSparkEnv(Map())
84+
setSparkEnv(Map.empty)
6885
val groupId = "id-" + ju.UUID.randomUUID().toString
6986
val kafkaParams = Seq(
7087
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
@@ -75,14 +92,11 @@ class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTes
7592

7693
val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap
7794

78-
assert(redactedParams(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
79-
=== groupId)
80-
assert(redactedParams(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).asInstanceOf[String]
81-
=== REDACTION_REPLACEMENT_TEXT)
82-
assert(redactedParams(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[String]
83-
=== REDACTION_REPLACEMENT_TEXT)
84-
assert(redactedParams(SslConfigs.SSL_KEY_PASSWORD_CONFIG).asInstanceOf[String]
85-
=== REDACTION_REPLACEMENT_TEXT)
95+
assert(redactedParams.size === 4)
96+
assert(redactedParams(ConsumerConfig.GROUP_ID_CONFIG) === groupId)
97+
assert(redactedParams(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT)
98+
assert(redactedParams(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT)
99+
assert(redactedParams(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT)
86100
}
87101

88102
test("redactJaasParam should give back null") {

0 commit comments

Comments
 (0)