Skip to content

Commit b39479b

Browse files
committed
Polish "Add config property for Kafka consumer isolation level"
See gh-17389
1 parent 1b8f955 commit b39479b

File tree

3 files changed

+10
-25
lines changed

3 files changed

+10
-25
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
import java.util.Collections;
2424
import java.util.HashMap;
2525
import java.util.List;
26+
import java.util.Locale;
2627
import java.util.Map;
2728

2829
import org.apache.kafka.clients.CommonClientConfigs;
2930
import org.apache.kafka.clients.consumer.ConsumerConfig;
3031
import org.apache.kafka.clients.producer.ProducerConfig;
3132
import org.apache.kafka.common.config.SslConfigs;
33+
import org.apache.kafka.common.requests.IsolationLevel;
3234
import org.apache.kafka.common.serialization.StringDeserializer;
3335
import org.apache.kafka.common.serialization.StringSerializer;
3436

@@ -267,11 +269,9 @@ public static class Consumer {
267269
private Duration heartbeatInterval;
268270

269271
/**
270-
* Controls how transactional messages are returned when polling the broker
271-
* (non-transactional messages will be unconditionally returned, regardless of
272-
* this setting).
272+
* Isolation level for reading messages that have been written transactionally.
273273
*/
274-
private String isolationLevel;
274+
private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
275275

276276
/**
277277
* Deserializer class for keys.
@@ -369,11 +369,11 @@ public void setHeartbeatInterval(Duration heartbeatInterval) {
369369
this.heartbeatInterval = heartbeatInterval;
370370
}
371371

372-
public String getIsolationLevel() {
372+
public IsolationLevel getIsolationLevel() {
373373
return this.isolationLevel;
374374
}
375375

376-
public void setIsolationLevel(String isolationLevel) {
376+
public void setIsolationLevel(IsolationLevel isolationLevel) {
377377
this.isolationLevel = isolationLevel;
378378
}
379379

@@ -421,7 +421,8 @@ public Map<String, Object> buildProperties() {
421421
map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG));
422422
map.from(this::getHeartbeatInterval).asInt(Duration::toMillis)
423423
.to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
424-
map.from(this::getIsolationLevel).to(properties.in(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
424+
map.from(() -> getIsolationLevel().name().toLowerCase(Locale.ROOT))
425+
.to(properties.in(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
425426
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
426427
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
427428
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));

spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2228,24 +2228,6 @@
22282228
}
22292229
]
22302230
},
2231-
{
2232-
"name": "spring.kafka.consumer.isolation-level",
2233-
"values": [
2234-
{
2235-
"value": "read_committed",
2236-
"description": "Only consume transactional messages that have been committed."
2237-
},
2238-
{
2239-
"value": "read_uncommitted",
2240-
"description": "Consume all transactional messages (even those that have been aborted)."
2241-
}
2242-
],
2243-
"providers": [
2244-
{
2245-
"name": "any"
2246-
}
2247-
]
2248-
},
22492231
{
22502232
"name": "spring.kafka.producer.key-serializer",
22512233
"providers": [

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ void consumerProperties() {
105105
"spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.fetch-max-wait=456",
106106
"spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB",
107107
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234",
108+
"spring.kafka.consumer.isolation-level = read-committed",
108109
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
109110
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
110111
.run((context) -> {
@@ -133,6 +134,7 @@ void consumerProperties() {
133134
assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(1024);
134135
assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
135136
assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
137+
assertThat(configs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)).isEqualTo("read_committed");
136138
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
137139
.isEqualTo(LongDeserializer.class);
138140
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))

0 commit comments

Comments
 (0)