diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index fe18836a62..c391411f8a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -171,6 +171,7 @@ * @author Lokesh Alamuri * @author Sanghyeok An * @author Christian Fredriksson + * @author Timofey Barabanov */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -625,7 +626,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final Map offsets = new LinkedHashMap<>(); - private final Collection assignedPartitions = new LinkedHashSet<>(); + private final Collection assignedPartitions = Collections.synchronizedSet(new LinkedHashSet<>()); private final Map lastCommits = new HashMap<>(); @@ -1247,7 +1248,8 @@ private void subscribeOrAssignTopics(final Consumer subscr else { List topicPartitionsToAssign = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions); - this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size()); + this.definedPartitions = Collections.synchronizedMap( + new LinkedHashMap<>(topicPartitionsToAssign.size())); for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) { this.definedPartitions.put(topicPartition.getTopicPartition(), new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),