Skip to content

Commit 69f4e3b

Browse files
committed
Add KafkaListener support for shared consumer containers
- Add AbstractShareKafkaListenerContainerFactory base class for share consumer factories - Add ShareKafkaListenerContainerFactory concrete implementation - Add ShareRecordMessagingMessageListenerAdapter for share consumer message handling - Modify MethodKafkaListenerEndpoint to create appropriate listener adapters based on container type - Add integration tests for ShareKafkaListener functionality Signed-off-by: Soby Chacko <[email protected]>
1 parent b1d8836 commit 69f4e3b

File tree

7 files changed

+531
-6
lines changed

7 files changed

+531
-6
lines changed
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
import java.util.Arrays;
20+
import java.util.Collection;
21+
import java.util.regex.Pattern;
22+
23+
import org.apache.commons.logging.LogFactory;
24+
import org.jspecify.annotations.Nullable;
25+
26+
import org.springframework.context.ApplicationContext;
27+
import org.springframework.context.ApplicationContextAware;
28+
import org.springframework.context.ApplicationEventPublisher;
29+
import org.springframework.context.ApplicationEventPublisherAware;
30+
import org.springframework.core.log.LogAccessor;
31+
import org.springframework.kafka.core.ShareConsumerFactory;
32+
import org.springframework.kafka.listener.AbstractShareKafkaMessageListenerContainer;
33+
import org.springframework.kafka.listener.ContainerProperties;
34+
import org.springframework.kafka.support.JavaUtils;
35+
import org.springframework.kafka.support.TopicPartitionOffset;
36+
37+
/**
38+
* Base {@link KafkaListenerContainerFactory} for creating containers that use Kafka's share consumer model.
39+
* <p>
40+
* This abstract factory provides common configuration and lifecycle management for share consumer containers.
41+
* It handles the creation of containers based on endpoints, topics, or patterns, and applies common
42+
* configuration properties to the created containers.
43+
* <p>
44+
* The share consumer model enables cooperative rebalancing, allowing consumers to maintain ownership of
45+
* some partitions while relinquishing others during rebalances, which can reduce disruption compared to
46+
* the classic consumer model.
47+
*
48+
* @param <C> the container type
49+
* @param <K> the key type
50+
* @param <V> the value type
51+
*
52+
* @author Soby Chacko
53+
* @since 4.0
54+
*/
55+
public abstract class AbstractShareKafkaListenerContainerFactory<C extends AbstractShareKafkaMessageListenerContainer<K, V>, K, V>
56+
implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware, ApplicationContextAware {
57+
58+
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
59+
60+
private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);
61+
62+
private @Nullable ShareConsumerFactory<? super K, ? super V> shareConsumerFactory;
63+
64+
private @Nullable Boolean autoStartup;
65+
66+
private @Nullable Integer phase;
67+
68+
private @Nullable ApplicationEventPublisher applicationEventPublisher;
69+
70+
private @Nullable ApplicationContext applicationContext;
71+
72+
@Override
73+
public void setApplicationContext(ApplicationContext applicationContext) {
74+
this.applicationContext = applicationContext;
75+
}
76+
77+
/**
78+
* Set the share consumer factory to use for creating containers.
79+
* @param shareConsumerFactory the share consumer factory
80+
*/
81+
public void setShareConsumerFactory(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory) {
82+
this.shareConsumerFactory = shareConsumerFactory;
83+
}
84+
85+
/**
86+
* Get the share consumer factory.
87+
* @return the share consumer factory
88+
*/
89+
public @Nullable ShareConsumerFactory<? super K, ? super V> getShareConsumerFactory() {
90+
return this.shareConsumerFactory;
91+
}
92+
93+
/**
94+
* Set whether containers created by this factory should auto-start.
95+
* @param autoStartup true to auto-start
96+
*/
97+
public void setAutoStartup(Boolean autoStartup) {
98+
this.autoStartup = autoStartup;
99+
}
100+
101+
/**
102+
* Set the phase in which containers created by this factory should start and stop.
103+
* @param phase the phase
104+
*/
105+
public void setPhase(Integer phase) {
106+
this.phase = phase;
107+
}
108+
109+
@Override
110+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
111+
this.applicationEventPublisher = applicationEventPublisher;
112+
}
113+
114+
/**
115+
* Get the container properties.
116+
* @return the container properties
117+
*/
118+
public ContainerProperties getContainerProperties() {
119+
return this.containerProperties;
120+
}
121+
122+
@Override
123+
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
124+
C instance = createContainerInstance(endpoint);
125+
JavaUtils.INSTANCE
126+
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
127+
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
128+
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
129+
}
130+
endpoint.setupListenerContainer(instance, null); // No message converter for MVP
131+
initializeContainer(instance, endpoint);
132+
return instance;
133+
}
134+
135+
private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> endpoint) {
136+
// Minimal configuration; can add more properties later
137+
}
138+
139+
/**
140+
* Initialize the provided container with common configuration properties.
141+
* @param instance the container instance
142+
* @param endpoint the endpoint
143+
*/
144+
protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
145+
ContainerProperties properties = instance.getContainerProperties();
146+
if (this.containerProperties.getAckCount() > 0) {
147+
properties.setAckCount(this.containerProperties.getAckCount());
148+
}
149+
if (this.containerProperties.getAckTime() > 0) {
150+
properties.setAckTime(this.containerProperties.getAckTime());
151+
}
152+
if (endpoint.getAutoStartup() != null) {
153+
instance.setAutoStartup(endpoint.getAutoStartup());
154+
}
155+
else if (this.autoStartup != null) {
156+
instance.setAutoStartup(this.autoStartup);
157+
}
158+
if (this.phase != null) {
159+
instance.setPhase(this.phase);
160+
}
161+
if (this.applicationContext != null) {
162+
instance.setApplicationContext(this.applicationContext);
163+
}
164+
if (this.applicationEventPublisher != null) {
165+
instance.setApplicationEventPublisher(this.applicationEventPublisher);
166+
}
167+
if (endpoint.getGroupId() != null) {
168+
instance.getContainerProperties().setGroupId(endpoint.getGroupId());
169+
}
170+
if (endpoint.getClientIdPrefix() != null) {
171+
instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix());
172+
}
173+
if (endpoint.getConsumerProperties() != null) {
174+
instance.getContainerProperties().setKafkaConsumerProperties(endpoint.getConsumerProperties());
175+
}
176+
}
177+
178+
@Override
179+
public C createContainer(TopicPartitionOffset... topicPartitions) {
180+
return createContainerInstance(new KafkaListenerEndpointAdapter() {
181+
@Override
182+
public TopicPartitionOffset[] getTopicPartitionsToAssign() {
183+
return Arrays.copyOf(topicPartitions, topicPartitions.length);
184+
}
185+
});
186+
}
187+
188+
@Override
189+
public C createContainer(String... topics) {
190+
return createContainerInstance(new KafkaListenerEndpointAdapter() {
191+
@Override
192+
public Collection<String> getTopics() {
193+
return Arrays.asList(topics);
194+
}
195+
});
196+
}
197+
198+
@Override
199+
public C createContainer(Pattern topicPattern) {
200+
return createContainerInstance(new KafkaListenerEndpointAdapter() {
201+
@Override
202+
public Pattern getTopicPattern() {
203+
return topicPattern;
204+
}
205+
});
206+
}
207+
208+
/**
209+
* Create a container instance for the provided endpoint.
210+
* @param endpoint the endpoint
211+
* @return the container instance
212+
*/
213+
protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint);
214+
}

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
import org.springframework.expression.BeanResolver;
3232
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
3333
import org.springframework.kafka.listener.MessageListenerContainer;
34+
import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer;
3435
import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter;
3536
import org.springframework.kafka.listener.adapter.BatchToRecordAdapter;
3637
import org.springframework.kafka.listener.adapter.HandlerAdapter;
3738
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
3839
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
40+
import org.springframework.kafka.listener.adapter.ShareRecordMessagingMessageListenerAdapter;
3941
import org.springframework.kafka.support.JavaUtils;
4042
import org.springframework.kafka.support.converter.BatchMessageConverter;
4143
import org.springframework.kafka.support.converter.MessageConverter;
@@ -175,7 +177,14 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageLis
175177

176178
Assert.state(this.messageHandlerMethodFactory != null,
177179
"Could not create message listener - MessageHandlerMethodFactory not set");
178-
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
180+
181+
final MessagingMessageListenerAdapter<K, V> messageListener;
182+
if (container instanceof ShareKafkaMessageListenerContainer<?, ?>) {
183+
messageListener = createShareMessageListenerInstance(messageConverter);
184+
}
185+
else {
186+
messageListener = createMessageListenerInstance(messageConverter);
187+
}
179188
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
180189
JavaUtils.INSTANCE
181190
.acceptIfNotNull(getReplyTopic(), replyTopic -> {
@@ -206,6 +215,26 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte
206215
* @param messageConverter the converter (may be null).
207216
* @return the {@link MessagingMessageListenerAdapter} instance.
208217
*/
218+
protected MessagingMessageListenerAdapter<K, V> createShareMessageListenerInstance(
219+
@Nullable MessageConverter messageConverter) {
220+
221+
MessagingMessageListenerAdapter<K, V> listener;
222+
ShareRecordMessagingMessageListenerAdapter<K, V> messageListener = new ShareRecordMessagingMessageListenerAdapter<>(
223+
this.bean, this.method, this.errorHandler);
224+
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) {
225+
messageListener.setMessageConverter(recordMessageConverter);
226+
}
227+
listener = messageListener;
228+
if (this.messagingConverter != null) {
229+
listener.setMessagingConverter(this.messagingConverter);
230+
}
231+
BeanResolver resolver = getBeanResolver();
232+
if (resolver != null) {
233+
listener.setBeanResolver(resolver);
234+
}
235+
return listener;
236+
}
237+
209238
protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(
210239
@Nullable MessageConverter messageConverter) {
211240

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
import java.util.Collection;
20+
21+
import org.springframework.kafka.core.ShareConsumerFactory;
22+
import org.springframework.kafka.listener.ContainerProperties;
23+
import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer;
24+
import org.springframework.kafka.support.TopicPartitionOffset;
25+
import org.springframework.util.Assert;
26+
27+
/**
28+
* A {@link KafkaListenerContainerFactory} implementation to create {@link ShareKafkaMessageListenerContainer}
29+
* instances for Kafka's share consumer model.
30+
*
31+
* @param <K> the key type
32+
* @param <V> the value type
33+
*
34+
* @author Soby Chacko
35+
* @since 4.0
36+
*/
37+
public class ShareKafkaListenerContainerFactory<K, V>
38+
extends AbstractShareKafkaListenerContainerFactory<ShareKafkaMessageListenerContainer<K, V>, K, V> {
39+
40+
/**
41+
* Construct an instance with the provided consumer factory.
42+
* @param shareConsumerFactory the share consumer factory
43+
*/
44+
public ShareKafkaListenerContainerFactory(ShareConsumerFactory<K, V> shareConsumerFactory) {
45+
setShareConsumerFactory(shareConsumerFactory);
46+
}
47+
48+
@Override
49+
protected ShareKafkaMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
50+
TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
51+
if (topicPartitions != null && topicPartitions.length > 0) {
52+
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), new ContainerProperties(topicPartitions));
53+
}
54+
else {
55+
Collection<String> topics = endpoint.getTopics();
56+
Assert.state(topics != null, "'topics' must not be null");
57+
if (!topics.isEmpty()) {
58+
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(),
59+
new ContainerProperties(topics.toArray(new String[0])));
60+
}
61+
else {
62+
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(),
63+
new ContainerProperties(endpoint.getTopicPattern()));
64+
}
65+
}
66+
}
67+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.commons.logging.LogFactory;
2323
import org.apache.kafka.clients.consumer.ConsumerConfig;
24+
import org.jspecify.annotations.NonNull;
2425
import org.jspecify.annotations.Nullable;
2526

2627
import org.springframework.beans.BeanUtils;
@@ -58,6 +59,9 @@ public abstract class AbstractShareKafkaMessageListenerContainer<K, V>
5859
*/
5960
public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100;
6061

62+
/**
63+
* The share consumer factory used to create consumer instances.
64+
*/
6165
protected final ShareConsumerFactory<K, V> shareConsumerFactory;
6266

6367
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
@@ -66,6 +70,7 @@ public abstract class AbstractShareKafkaMessageListenerContainer<K, V>
6670

6771
protected final ReentrantLock lifecycleLock = new ReentrantLock();
6872

73+
@NonNull
6974
private String beanName = "noBeanNameSet";
7075

7176
@Nullable

spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-present the original author or authors.
2+
* Copyright 2025-present the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,8 +43,6 @@
4343
* This container manages a single-threaded consumer loop using a {@link org.springframework.kafka.core.ShareConsumerFactory}.
4444
* It is designed for use cases where Kafka's cooperative sharing protocol is desired, and provides a simple polling loop
4545
* with per-record dispatch and acknowledgement.
46-
* <p>
47-
* Lifecycle events are published for consumer starting and started. The container supports direct setting of the client.id.
4846
*
4947
* @param <K> the key type
5048
* @param <V> the value type
@@ -73,7 +71,7 @@ public class ShareKafkaMessageListenerContainer<K, V>
7371
* @param shareConsumerFactory the share consumer factory
7472
* @param containerProperties the container properties
7573
*/
76-
public ShareKafkaMessageListenerContainer(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory,
74+
public ShareKafkaMessageListenerContainer(@Nullable ShareConsumerFactory<? super K, ? super V> shareConsumerFactory,
7775
ContainerProperties containerProperties) {
7876
super(shareConsumerFactory, containerProperties);
7977
Assert.notNull(shareConsumerFactory, "A ShareConsumerFactory must be provided");
@@ -152,7 +150,7 @@ private void publishConsumerStartedEvent() {
152150
}
153151

154152
/**
155-
* The inner share consumer thread: polls for records and dispatches to the listener.
153+
* The inner share consumer thread that polls for records and dispatches to the listener.
156154
*/
157155
private class ShareListenerConsumer implements Runnable {
158156

0 commit comments

Comments
 (0)