Skip to content

Commit 2873012

Browse files
committed
GH-522: Fix NPE in listener container
Fixes #522
1 parent 36aabb4 commit 2873012

File tree

1 file changed

+13
-7
lines changed

1 file changed

+13
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
9797

9898
private final TopicPartitionInitialOffset[] topicPartitions;
9999

100-
private ListenerConsumer listenerConsumer;
100+
private volatile ListenerConsumer listenerConsumer;
101101

102-
private ListenableFuture<?> listenerConsumerFuture;
102+
private volatile ListenableFuture<?> listenerConsumerFuture;
103103

104104
private GenericMessageListener<?> listener;
105105

@@ -179,11 +179,17 @@ public void setClientIdSuffix(String clientIdSuffix) {
179179
* either explicitly or by Kafka; may be null if not assigned yet.
180180
*/
181181
public Collection<TopicPartition> getAssignedPartitions() {
182-
if (this.listenerConsumer.definedPartitions != null) {
183-
return Collections.unmodifiableCollection(this.listenerConsumer.definedPartitions.keySet());
184-
}
185-
else if (this.listenerConsumer.assignedPartitions != null) {
186-
return Collections.unmodifiableCollection(this.listenerConsumer.assignedPartitions);
182+
ListenerConsumer listenerConsumer = this.listenerConsumer;
183+
if (listenerConsumer != null) {
184+
if (listenerConsumer.definedPartitions != null) {
185+
return Collections.unmodifiableCollection(listenerConsumer.definedPartitions.keySet());
186+
}
187+
else if (listenerConsumer.assignedPartitions != null) {
188+
return Collections.unmodifiableCollection(listenerConsumer.assignedPartitions);
189+
}
190+
else {
191+
return null;
192+
}
187193
}
188194
else {
189195
return null;

0 commit comments

Comments
 (0)