diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index b96df2c88c..2ea11bf568 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -22,12 +22,11 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -323,41 +322,27 @@ private void updateClusterId(Admin adminClient) throws InterruptedException, Exe */ protected Collection newTopics() { Assert.state(this.applicationContext != null, "'applicationContext' cannot be null"); - Map newTopicsMap = new HashMap<>( - this.applicationContext.getBeansOfType(NewTopic.class, false, false)); - Map wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false); - AtomicInteger count = new AtomicInteger(); - wrappers.forEach((name, newTopics) -> { - newTopics.getNewTopics().forEach(nt -> newTopicsMap.put(name + "#" + count.getAndIncrement(), nt)); - }); - Map topicsForRetry = newTopicsMap.entrySet().stream() - .filter(entry -> entry.getValue() instanceof TopicForRetryable) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - for (Entry entry : topicsForRetry.entrySet()) { - Iterator> iterator = newTopicsMap.entrySet().iterator(); - boolean remove = false; - while (iterator.hasNext()) { - Entry nt = iterator.next(); - // if we have a NewTopic and TopicForRetry with the same name, remove the latter - if (nt.getValue().name().equals(entry.getValue().name()) - && !(nt.getValue() instanceof TopicForRetryable)) { - - remove = true; - break; - } - } - if (remove) { - newTopicsMap.remove(entry.getKey()); - } - } - Iterator> iterator = newTopicsMap.entrySet().iterator(); - while (iterator.hasNext()) { - Entry next = iterator.next(); - if (!this.createOrModifyTopic.test(next.getValue())) { - iterator.remove(); - } - } - return new ArrayList<>(newTopicsMap.values()); + + // Deal with List directly instead of Map (no need for bean names) + List newTopicsList = new ArrayList<>( + this.applicationContext.getBeansOfType(NewTopic.class, false, false).values()); + + // Add topics from NewTopics wrappers (no need for bean names either) + this.applicationContext.getBeansOfType(NewTopics.class, false, false).values() + .forEach(wrapper -> newTopicsList.addAll(wrapper.getNewTopics())); + + // Collect regular topic names to check against TopicForRetryable + Set regularTopicNames = newTopicsList.stream() + .filter(nt -> !(nt instanceof TopicForRetryable)) + .map(NewTopic::name) + .collect(Collectors.toSet()); + + // Apply combined filter: remove TopicForRetryable with same name as regular topic OR topics that don't pass predicate + newTopicsList.removeIf(nt -> + (nt instanceof TopicForRetryable && regularTopicNames.contains(nt.name())) || + !this.createOrModifyTopic.test(nt)); + + return newTopicsList; } @Override