|
23 | 23 | import java.util.Iterator;
|
24 | 24 | import java.util.List;
|
25 | 25 | import java.util.Map;
|
| 26 | +import java.util.Objects; |
26 | 27 | import java.util.concurrent.BlockingQueue;
|
27 | 28 | import java.util.concurrent.ConcurrentHashMap;
|
28 | 29 | import java.util.concurrent.Future;
|
|
110 | 111 | * @author Artem Bilan
|
111 | 112 | * @author Chris Gilbert
|
112 | 113 | * @author Thomas Strauß
|
| 114 | + * @author Nathan Xu |
113 | 115 | */
|
114 | 116 | public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
|
115 | 117 | implements ProducerFactory<K, V>, ApplicationContextAware,
|
@@ -660,23 +662,31 @@ public boolean removePostProcessor(ProducerPostProcessor<K, V> postProcessor) {
|
660 | 662 |
|
661 | 663 | @Override
|
662 | 664 | public void updateConfigs(Map<String, Object> updates) {
|
663 |
| - updates.entrySet().forEach(entry -> { |
664 |
| - if (entry.getKey().equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) { |
665 |
| - Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG |
666 |
| - + "' must be a String, not a " + entry.getClass().getName()); |
667 |
| - Assert.isTrue(this.transactionIdPrefix != null |
668 |
| - ? entry.getValue() != null |
669 |
| - : entry.getValue() == null, |
670 |
| - "Cannot change transactional capability"); |
671 |
| - this.transactionIdPrefix = (String) entry.getValue(); |
| 665 | + updates.forEach((key, value) -> { |
| 666 | + if (key == null) { // ConcurrentHashMap doesn't accept null key |
| 667 | + return; |
672 | 668 | }
|
673 |
| - else if (entry.getKey().equals(ProducerConfig.CLIENT_ID_CONFIG)) { |
674 |
| - Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.CLIENT_ID_CONFIG |
675 |
| - + "' must be a String, not a " + entry.getClass().getName()); |
676 |
| - this.clientIdPrefix = (String) entry.getValue(); |
| 669 | + if (key.equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) { |
| 670 | + Assert.isTrue( |
| 671 | + value == null || value instanceof String, |
| 672 | + () -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG |
| 673 | + + "' must be null or a String, not a " + Objects.requireNonNull(value) |
| 674 | + .getClass() |
| 675 | + .getName() |
| 676 | + ); |
| 677 | + Assert.isTrue( |
| 678 | + (this.transactionIdPrefix != null) == (value != null), |
| 679 | + "Cannot change transactional capability" |
| 680 | + ); |
| 681 | + this.transactionIdPrefix = (String) value; |
677 | 682 | }
|
678 |
| - else { |
679 |
| - this.configs.put(entry.getKey(), entry.getValue()); |
| 683 | + else if (key.equals(ProducerConfig.CLIENT_ID_CONFIG)) { |
| 684 | + Assert.isTrue(value == null || value instanceof String, () -> "'" + ProducerConfig.CLIENT_ID_CONFIG |
| 685 | + + "' must be null or a String, not a " + Objects.requireNonNull(value).getClass().getName()); |
| 686 | + this.clientIdPrefix = (String) value; |
| 687 | + } |
| 688 | + else if (value != null) { // ConcurrentHashMap doesn't accept null value |
| 689 | + this.configs.put(key, value); |
680 | 690 | }
|
681 | 691 | });
|
682 | 692 | }
|
|
0 commit comments