Skip to content

Commit dde79e5

Browse files
committed
Merge pull request #17389 from sparty02
* gh-17389: Polish "Add config property for Kafka consumer isolation level" Add config property for Kafka consumer isolation level Closes gh-17389
2 parents 0d124e9 + b39479b commit dde79e5

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
import java.util.Collections;
2424
import java.util.HashMap;
2525
import java.util.List;
26+
import java.util.Locale;
2627
import java.util.Map;
2728

2829
import org.apache.kafka.clients.CommonClientConfigs;
2930
import org.apache.kafka.clients.consumer.ConsumerConfig;
3031
import org.apache.kafka.clients.producer.ProducerConfig;
3132
import org.apache.kafka.common.config.SslConfigs;
33+
import org.apache.kafka.common.requests.IsolationLevel;
3234
import org.apache.kafka.common.serialization.StringDeserializer;
3335
import org.apache.kafka.common.serialization.StringSerializer;
3436

@@ -266,6 +268,11 @@ public static class Consumer {
266268
*/
267269
private Duration heartbeatInterval;
268270

271+
/**
272+
* Isolation level for reading messages that have been written transactionally.
273+
*/
274+
private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
275+
269276
/**
270277
* Deserializer class for keys.
271278
*/
@@ -362,6 +369,14 @@ public void setHeartbeatInterval(Duration heartbeatInterval) {
362369
this.heartbeatInterval = heartbeatInterval;
363370
}
364371

372+
public IsolationLevel getIsolationLevel() {
373+
return this.isolationLevel;
374+
}
375+
376+
public void setIsolationLevel(IsolationLevel isolationLevel) {
377+
this.isolationLevel = isolationLevel;
378+
}
379+
365380
public Class<?> getKeyDeserializer() {
366381
return this.keyDeserializer;
367382
}
@@ -406,6 +421,8 @@ public Map<String, Object> buildProperties() {
406421
map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG));
407422
map.from(this::getHeartbeatInterval).asInt(Duration::toMillis)
408423
.to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
424+
map.from(() -> getIsolationLevel().name().toLowerCase(Locale.ROOT))
425+
.to(properties.in(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
409426
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
410427
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
411428
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ void consumerProperties() {
105105
"spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.fetch-max-wait=456",
106106
"spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB",
107107
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234",
108+
"spring.kafka.consumer.isolation-level = read-committed",
108109
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
109110
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
110111
.run((context) -> {
@@ -133,6 +134,7 @@ void consumerProperties() {
133134
assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(1024);
134135
assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
135136
assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
137+
assertThat(configs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)).isEqualTo("read_committed");
136138
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
137139
.isEqualTo(LongDeserializer.class);
138140
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))

0 commit comments

Comments
 (0)