Skip to content

Commit b772ebd

Browse files
authored
GH-3726: Fix KafkaMessageListenerContainer for ConcurrentModificationException
Fixes: #3726 Issue link: #3726 `KafkaMessageListenerContainer.getAssignedPartitions()` is not safe due to the fact that different threads can iterate/modify any of the fields `partitionsListenerConsumer.definedPartitions` or `partitionsListenerConsumer.assignedPartitions` simultaneously, but collection types of these fields are not designed for such scenarios. Thus at least `ConcurrentModificationException` can be thrown. * Wrap `partitionsListenerConsumer.definedPartitions` and `partitionsListenerConsumer.assignedPartitions` into `Collections.synchronizedSet()` Signed-off-by: Tim Barabanov <[email protected]> [[email protected] Fix commit message] **Auto-cherry-pick to `3.2.x`** Signed-off-by: Artem Bilan <[email protected]>
1 parent 7aa87dd commit b772ebd

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -171,6 +171,7 @@
171171
* @author Lokesh Alamuri
172172
* @author Sanghyeok An
173173
* @author Christian Fredriksson
174+
* @author Timofey Barabanov
174175
*/
175176
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
176177
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -625,7 +626,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
625626

626627
private final Map<TopicPartition, Long> offsets = new LinkedHashMap<>();
627628

628-
private final Collection<TopicPartition> assignedPartitions = new LinkedHashSet<>();
629+
private final Collection<TopicPartition> assignedPartitions = Collections.synchronizedSet(new LinkedHashSet<>());
629630

630631
private final Map<TopicPartition, OffsetAndMetadata> lastCommits = new HashMap<>();
631632

@@ -1247,7 +1248,8 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
12471248
else {
12481249
List<TopicPartitionOffset> topicPartitionsToAssign =
12491250
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
1250-
this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size());
1251+
this.definedPartitions = Collections.synchronizedMap(
1252+
new LinkedHashMap<>(topicPartitionsToAssign.size()));
12511253
for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
12521254
this.definedPartitions.put(topicPartition.getTopicPartition(),
12531255
new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),

0 commit comments

Comments
 (0)