Skip to content

Commit 1b8f955

Browse files
svcacct-epo-cicdwilkinsona
authored andcommitted
Add config property for Kafka consumer isolation level
See gh-17389
1 parent 0d124e9 commit 1b8f955

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,13 @@ public static class Consumer {
266266
*/
267267
private Duration heartbeatInterval;
268268

269+
/**
270+
* Controls how transactional messages are returned when polling the broker
271+
* (non-transactional messages will be unconditionally returned, regardless of
272+
* this setting).
273+
*/
274+
private String isolationLevel;
275+
269276
/**
270277
* Deserializer class for keys.
271278
*/
@@ -362,6 +369,14 @@ public void setHeartbeatInterval(Duration heartbeatInterval) {
362369
this.heartbeatInterval = heartbeatInterval;
363370
}
364371

372+
public String getIsolationLevel() {
373+
return this.isolationLevel;
374+
}
375+
376+
public void setIsolationLevel(String isolationLevel) {
377+
this.isolationLevel = isolationLevel;
378+
}
379+
365380
public Class<?> getKeyDeserializer() {
366381
return this.keyDeserializer;
367382
}
@@ -406,6 +421,7 @@ public Map<String, Object> buildProperties() {
406421
map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG));
407422
map.from(this::getHeartbeatInterval).asInt(Duration::toMillis)
408423
.to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
424+
map.from(this::getIsolationLevel).to(properties.in(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
409425
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
410426
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
411427
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));

spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2228,6 +2228,24 @@
22282228
}
22292229
]
22302230
},
2231+
{
2232+
"name": "spring.kafka.consumer.isolation-level",
2233+
"values": [
2234+
{
2235+
"value": "read_committed",
2236+
"description": "Only consume transactional messages that have been committed."
2237+
},
2238+
{
2239+
"value": "read_uncommitted",
2240+
"description": "Consume all transactional messages (even those that have been aborted)."
2241+
}
2242+
],
2243+
"providers": [
2244+
{
2245+
"name": "any"
2246+
}
2247+
]
2248+
},
22312249
{
22322250
"name": "spring.kafka.producer.key-serializer",
22332251
"providers": [

0 commit comments

Comments
 (0)