Skip to content

Commit 2e85c4f

Browse files
committed
Merge pull request #20360 from dreis2211
* pr/20360: Add security.protocol to KafkaProperties Closes gh-20360
2 parents 521143a + 7924dd6 commit 2e85c4f

File tree

2 files changed

+85
-9
lines changed

2 files changed

+85
-9
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -91,6 +91,8 @@ public class KafkaProperties {
9191

9292
private final Template template = new Template();
9393

94+
private final Security security = new Security();
95+
9496
public List<String> getBootstrapServers() {
9597
return this.bootstrapServers;
9698
}
@@ -143,6 +145,10 @@ public Template getTemplate() {
143145
return this.template;
144146
}
145147

148+
public Security getSecurity() {
149+
return this.security;
150+
}
151+
146152
private Map<String, Object> buildCommonProperties() {
147153
Map<String, Object> properties = new HashMap<>();
148154
if (this.bootstrapServers != null) {
@@ -152,6 +158,7 @@ private Map<String, Object> buildCommonProperties() {
152158
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
153159
}
154160
properties.putAll(this.ssl.buildProperties());
161+
properties.putAll(this.security.buildProperties());
155162
if (!CollectionUtils.isEmpty(this.properties)) {
156163
properties.putAll(this.properties);
157164
}
@@ -217,6 +224,8 @@ public static class Consumer {
217224

218225
private final Ssl ssl = new Ssl();
219226

227+
private final Security security = new Security();
228+
220229
/**
221230
* Frequency with which the consumer offsets are auto-committed to Kafka if
222231
* 'enable.auto.commit' is set to true.
@@ -297,6 +306,10 @@ public Ssl getSsl() {
297306
return this.ssl;
298307
}
299308

309+
public Security getSecurity() {
310+
return this.security;
311+
}
312+
300313
public Duration getAutoCommitInterval() {
301314
return this.autoCommitInterval;
302315
}
@@ -426,7 +439,7 @@ public Map<String, Object> buildProperties() {
426439
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
427440
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
428441
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
429-
return properties.with(this.ssl, this.properties);
442+
return properties.with(this.ssl, this.security, this.properties);
430443
}
431444

432445
}
@@ -435,6 +448,8 @@ public static class Producer {
435448

436449
private final Ssl ssl = new Ssl();
437450

451+
private final Security security = new Security();
452+
438453
/**
439454
* Number of acknowledgments the producer requires the leader to have received
440455
* before considering a request complete.
@@ -498,6 +513,10 @@ public Ssl getSsl() {
498513
return this.ssl;
499514
}
500515

516+
public Security getSecurity() {
517+
return this.security;
518+
}
519+
501520
public String getAcks() {
502521
return this.acks;
503522
}
@@ -595,7 +614,7 @@ public Map<String, Object> buildProperties() {
595614
map.from(this::getKeySerializer).to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
596615
map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG));
597616
map.from(this::getValueSerializer).to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
598-
return properties.with(this.ssl, this.properties);
617+
return properties.with(this.ssl, this.security, this.properties);
599618
}
600619

601620
}
@@ -604,6 +623,8 @@ public static class Admin {
604623

605624
private final Ssl ssl = new Ssl();
606625

626+
private final Security security = new Security();
627+
607628
/**
608629
* ID to pass to the server when making requests. Used for server-side logging.
609630
*/
@@ -623,6 +644,10 @@ public Ssl getSsl() {
623644
return this.ssl;
624645
}
625646

647+
public Security getSecurity() {
648+
return this.security;
649+
}
650+
626651
public String getClientId() {
627652
return this.clientId;
628653
}
@@ -647,7 +672,7 @@ public Map<String, Object> buildProperties() {
647672
Properties properties = new Properties();
648673
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
649674
map.from(this::getClientId).to(properties.in(ProducerConfig.CLIENT_ID_CONFIG));
650-
return properties.with(this.ssl, this.properties);
675+
return properties.with(this.ssl, this.security, this.properties);
651676
}
652677

653678
}
@@ -659,6 +684,8 @@ public static class Streams {
659684

660685
private final Ssl ssl = new Ssl();
661686

687+
private final Security security = new Security();
688+
662689
/**
663690
* Kafka streams application.id property; default spring.application.name.
664691
*/
@@ -705,6 +732,10 @@ public Ssl getSsl() {
705732
return this.ssl;
706733
}
707734

735+
public Security getSecurity() {
736+
return this.security;
737+
}
738+
708739
public String getApplicationId() {
709740
return this.applicationId;
710741
}
@@ -775,7 +806,7 @@ public Map<String, Object> buildProperties() {
775806
map.from(this::getClientId).to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
776807
map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
777808
map.from(this::getStateDir).to(properties.in("state.dir"));
778-
return properties.with(this.ssl, this.properties);
809+
return properties.with(this.ssl, this.security, this.properties);
779810
}
780811

781812
}
@@ -1167,15 +1198,40 @@ public void setOptions(Map<String, String> options) {
11671198

11681199
}
11691200

1201+
public static class Security {
1202+
1203+
/**
1204+
* Security protocol used to communicate with brokers.
1205+
*/
1206+
private String protocol;
1207+
1208+
public String getProtocol() {
1209+
return this.protocol;
1210+
}
1211+
1212+
public void setProtocol(String protocol) {
1213+
this.protocol = protocol;
1214+
}
1215+
1216+
public Map<String, Object> buildProperties() {
1217+
Properties properties = new Properties();
1218+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
1219+
map.from(this::getProtocol).to(properties.in(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
1220+
return properties;
1221+
}
1222+
1223+
}
1224+
11701225
@SuppressWarnings("serial")
11711226
private static class Properties extends HashMap<String, Object> {
11721227

11731228
<V> java.util.function.Consumer<V> in(String key) {
11741229
return (value) -> put(key, value);
11751230
}
11761231

1177-
Properties with(Ssl ssl, Map<String, String> properties) {
1232+
Properties with(Ssl ssl, Security security, Map<String, String> properties) {
11781233
putAll(ssl.buildProperties());
1234+
putAll(security.buildProperties());
11791235
putAll(properties);
11801236
return this;
11811237
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import javax.security.auth.login.AppConfigurationEntry;
2727

28+
import org.apache.kafka.clients.CommonClientConfigs;
2829
import org.apache.kafka.clients.admin.AdminClientConfig;
2930
import org.apache.kafka.clients.consumer.ConsumerConfig;
3031
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -106,6 +107,7 @@ void consumerProperties() {
106107
"spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB",
107108
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234",
108109
"spring.kafka.consumer.isolation-level = read-committed",
110+
"spring.kafka.consumer.security.protocol = SSL",
109111
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
110112
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
111113
.run((context) -> {
@@ -137,6 +139,7 @@ void consumerProperties() {
137139
assertThat(configs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)).isEqualTo("read_committed");
138140
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
139141
.isEqualTo(LongDeserializer.class);
142+
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
140143
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
141144
.isEqualTo(IntegerDeserializer.class);
142145
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42);
@@ -156,7 +159,7 @@ void producerProperties() {
156159
"spring.kafka.producer.buffer-memory=4KB", "spring.kafka.producer.compression-type=gzip",
157160
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
158161
"spring.kafka.producer.retries=2", "spring.kafka.producer.properties.fiz.buz=fix.fox",
159-
"spring.kafka.producer.ssl.key-password=p4",
162+
"spring.kafka.producer.security.protocol=SSL", "spring.kafka.producer.ssl.key-password=p4",
160163
"spring.kafka.producer.ssl.key-store-location=classpath:ksLocP",
161164
"spring.kafka.producer.ssl.key-store-password=p5", "spring.kafka.producer.ssl.key-store-type=PKCS12",
162165
"spring.kafka.producer.ssl.trust-store-location=classpath:tsLocP",
@@ -177,6 +180,7 @@ void producerProperties() {
177180
assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(4096L);
178181
assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
179182
assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class);
183+
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
180184
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
181185
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
182186
.endsWith(File.separator + "ksLocP");
@@ -202,7 +206,7 @@ void adminProperties() {
202206
this.contextRunner
203207
.withPropertyValues("spring.kafka.clientId=cid", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
204208
"spring.kafka.admin.fail-fast=true", "spring.kafka.admin.properties.fiz.buz=fix.fox",
205-
"spring.kafka.admin.ssl.key-password=p4",
209+
"spring.kafka.admin.security.protocol=SSL", "spring.kafka.admin.ssl.key-password=p4",
206210
"spring.kafka.admin.ssl.key-store-location=classpath:ksLocP",
207211
"spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12",
208212
"spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP",
@@ -214,6 +218,7 @@ void adminProperties() {
214218
// common
215219
assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
216220
// admin
221+
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
217222
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
218223
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
219224
.endsWith(File.separator + "ksLocP");
@@ -240,7 +245,7 @@ void streamsProperties() {
240245
"spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cache-max-size-buffering=1KB",
241246
"spring.kafka.streams.client-id=override", "spring.kafka.streams.properties.fiz.buz=fix.fox",
242247
"spring.kafka.streams.replication-factor=2", "spring.kafka.streams.state-dir=/tmp/state",
243-
"spring.kafka.streams.ssl.key-password=p7",
248+
"spring.kafka.streams.security.protocol=SSL", "spring.kafka.streams.ssl.key-password=p7",
244249
"spring.kafka.streams.ssl.key-store-location=classpath:ksLocP",
245250
"spring.kafka.streams.ssl.key-store-password=p8", "spring.kafka.streams.ssl.key-store-type=PKCS12",
246251
"spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP",
@@ -255,6 +260,7 @@ void streamsProperties() {
255260
assertThat(configs.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)).isEqualTo(1024);
256261
assertThat(configs.get(StreamsConfig.CLIENT_ID_CONFIG)).isEqualTo("override");
257262
assertThat(configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)).isEqualTo(2);
263+
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
258264
assertThat(configs.get(StreamsConfig.STATE_DIR_CONFIG)).isEqualTo("/tmp/state");
259265
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p7");
260266
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@@ -569,6 +575,20 @@ void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() {
569575
});
570576
}
571577

578+
@Test
579+
void specificSecurityProtocolOverridesCommonSecurityProtocol() {
580+
this.contextRunner.withPropertyValues("spring.kafka.security.protocol=SSL",
581+
"spring.kafka.admin.security.protocol=PLAINTEXT").run((context) -> {
582+
DefaultKafkaProducerFactory<?, ?> producerFactory = context
583+
.getBean(DefaultKafkaProducerFactory.class);
584+
Map<String, Object> producerConfigs = producerFactory.getConfigurationProperties();
585+
assertThat(producerConfigs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
586+
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
587+
Map<String, Object> configs = admin.getConfig();
588+
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("PLAINTEXT");
589+
});
590+
}
591+
572592
@Configuration(proxyBeanMethods = false)
573593
static class MessageConverterConfiguration {
574594

0 commit comments

Comments
 (0)