Skip to content

Commit f527f3c

Browse files
Fix Defects in DefaultKafkaProducerFactory#updateConfigs() (#2897)
1 parent a420ba0 commit f527f3c

File tree

2 files changed

+25
-15
lines changed

2 files changed

+25
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -626,23 +626,31 @@ public boolean removePostProcessor(ProducerPostProcessor<K, V> postProcessor) {
626626

627627
@Override
628628
public void updateConfigs(Map<String, Object> updates) {
629-
updates.entrySet().forEach(entry -> {
630-
if (entry.getKey().equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
631-
Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG
632-
+ "' must be a String, not a " + entry.getClass().getName());
633-
Assert.isTrue(this.transactionIdPrefix != null
634-
? entry.getValue() != null
635-
: entry.getValue() == null,
636-
"Cannot change transactional capability");
637-
this.transactionIdPrefix = (String) entry.getValue();
629+
updates.forEach((key, value) -> {
630+
if (key == null) {
631+
return;
638632
}
639-
else if (entry.getKey().equals(ProducerConfig.CLIENT_ID_CONFIG)) {
640-
Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.CLIENT_ID_CONFIG
641-
+ "' must be a String, not a " + entry.getClass().getName());
642-
this.clientIdPrefix = (String) entry.getValue();
633+
if (key.equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
634+
Assert.isTrue(
635+
value == null || value instanceof String,
636+
() -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG
637+
+ "' must be null or a String, not a " + value.getClass().getName()
638+
);
639+
Assert.isTrue(
640+
(this.transactionIdPrefix != null) == (value != null),
641+
"Cannot change transactional capability"
642+
);
643+
this.transactionIdPrefix = (String) value;
643644
}
644-
else {
645-
this.configs.put(entry.getKey(), entry.getValue());
645+
else if (key.equals(ProducerConfig.CLIENT_ID_CONFIG)) {
646+
Assert.isTrue(
647+
value == null || value instanceof String,
648+
() -> "'" + ProducerConfig.CLIENT_ID_CONFIG
649+
+ "' must be null or a String, not a " + value.getClass().getName());
650+
this.clientIdPrefix = (String) value;
651+
}
652+
else if (value != null) {
653+
this.configs.put(key, value);
646654
}
647655
});
648656
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatCode;
2021
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2122
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
2223
import static org.mockito.ArgumentMatchers.any;
@@ -545,6 +546,7 @@ void configUpdates() {
545546
assertThat(KafkaTestUtils.getPropertyValue(pf1, "transactionIdPrefix")).isEqualTo("tx2-");
546547
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
547548
assertThatIllegalArgumentException().isThrownBy(() -> pf1.updateConfigs(configs));
549+
assertThatCode(() -> pf1.updateConfigs(Collections.singletonMap(null, null))).doesNotThrowAnyException();
548550
}
549551

550552
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)