Skip to content

Commit 6d396b9

Browse files
nklmishphilwebb
authored andcommitted
Add Kafka transaction support property
Add `spring.kafka.producer.transaction-id-prefix` property that will be passed to `DefaultKafkaProducerFactory.setTransactionIdPrefix(...)` See gh-11076
1 parent 8d8357e commit 6d396b9

File tree

3 files changed

+38
-1
lines changed

3 files changed

+38
-1
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@
3838
import org.springframework.kafka.support.LoggingProducerListener;
3939
import org.springframework.kafka.support.ProducerListener;
4040
import org.springframework.kafka.support.converter.RecordMessageConverter;
41+
import org.springframework.kafka.transaction.KafkaTransactionManager;
4142

4243
/**
4344
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
4445
*
4546
* @author Gary Russell
4647
* @author Stephane Nicoll
4748
* @author Eddú Meléndez
49+
* @author Nakul Mishra
4850
* @since 1.5.0
4951
*/
5052
@Configuration
@@ -94,8 +96,20 @@ public ProducerListener<Object, Object> kafkaProducerListener() {
9496
@Bean
9597
@ConditionalOnMissingBean(ProducerFactory.class)
9698
public ProducerFactory<?, ?> kafkaProducerFactory() {
97-
return new DefaultKafkaProducerFactory<>(
99+
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
98100
this.properties.buildProducerProperties());
101+
KafkaProperties.Producer producer = this.properties.getProducer();
102+
if (producer.getTransactionIdPrefix() != null) {
103+
factory.setTransactionIdPrefix(producer.getTransactionIdPrefix());
104+
}
105+
return factory;
106+
}
107+
108+
@Bean
109+
@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
110+
@ConditionalOnMissingBean
111+
public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
112+
return new KafkaTransactionManager<>(producerFactory);
99113
}
100114

101115
@Bean

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
* @author Gary Russell
4646
* @author Stephane Nicoll
4747
* @author Artem Bilan
48+
* @author Nakul Mishra
4849
* @since 1.5.0
4950
*/
5051
@ConfigurationProperties(prefix = "spring.kafka")
@@ -519,6 +520,11 @@ public static class Producer {
519520
*/
520521
private Integer retries;
521522

523+
/**
524+
* When non empty, enables transactional support for producer.
525+
*/
526+
private String transactionIdPrefix;
527+
522528
/**
523529
* Additional producer-specific properties used to configure the client.
524530
*/
@@ -600,6 +606,14 @@ public void setRetries(Integer retries) {
600606
this.retries = retries;
601607
}
602608

609+
public String getTransactionIdPrefix() {
610+
return this.transactionIdPrefix;
611+
}
612+
613+
public void setTransactionIdPrefix(String transactionIdPrefix) {
614+
this.transactionIdPrefix = transactionIdPrefix;
615+
}
616+
603617
public Map<String, String> getProperties() {
604618
return this.properties;
605619
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.springframework.kafka.support.converter.MessagingMessageConverter;
4949
import org.springframework.kafka.support.converter.RecordMessageConverter;
5050
import org.springframework.kafka.test.utils.KafkaTestUtils;
51+
import org.springframework.kafka.transaction.KafkaTransactionManager;
5152

5253
import static org.assertj.core.api.Assertions.assertThat;
5354
import static org.assertj.core.api.Assertions.entry;
@@ -59,6 +60,7 @@
5960
* @author Gary Russell
6061
* @author Stephane Nicoll
6162
* @author Eddú Meléndez
63+
* @author Nakul Mishra
6264
*/
6365
public class KafkaAutoConfigurationTests {
6466

@@ -198,6 +200,9 @@ public void producerProperties() {
198200
assertThat(
199201
context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
200202
.isEmpty();
203+
assertThat(
204+
context.getBeansOfType(KafkaTransactionManager.class))
205+
.isEmpty();
201206
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
202207
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
203208
});
@@ -256,6 +261,7 @@ public void listenerProperties() {
256261
"spring.kafka.listener.poll-timeout=2000",
257262
"spring.kafka.listener.type=batch",
258263
"spring.kafka.jaas.enabled=true",
264+
"spring.kafka.producer.transaction-id-prefix=foo",
259265
"spring.kafka.jaas.login-module=foo",
260266
"spring.kafka.jaas.control-flag=REQUISITE",
261267
"spring.kafka.jaas.options.useKeyTab=true")
@@ -297,6 +303,9 @@ public void listenerProperties() {
297303
assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo");
298304
assertThat(dfa.getPropertyValue("controlFlag")).isEqualTo(
299305
AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
306+
assertThat(
307+
context.getBeansOfType(KafkaTransactionManager.class))
308+
.hasSize(1);
300309
assertThat(((Map<String, String>) dfa.getPropertyValue("options")))
301310
.containsExactly(entry("useKeyTab", "true"));
302311
});

0 commit comments

Comments
 (0)