Skip to content

Conversation

sobychacko
Copy link
Contributor

  • Add AbstractShareKafkaListenerContainerFactory base class for share consumer factories
  • Add ShareKafkaListenerContainerFactory concrete implementation
  • Add ShareRecordMessagingMessageListenerAdapter for share consumer message handling
  • Modify MethodKafkaListenerEndpoint to create appropriate listener adapters based on container type
  • Add integration tests for ShareKafkaListener functionality

- Add AbstractShareKafkaListenerContainerFactory base class for share consumer factories
- Add ShareKafkaListenerContainerFactory concrete implementation
- Add ShareRecordMessagingMessageListenerAdapter for share consumer message handling
- Modify MethodKafkaListenerEndpoint to create appropriate listener adapters based on container type
- Add integration tests for ShareKafkaListener functionality

Signed-off-by: Soby Chacko <[email protected]>
@sobychacko sobychacko force-pushed the KL-shared-consumer branch from 3bcce66 to 69f4e3b Compare July 1, 2025 02:29
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know too many details about the feature, so forgive me if my review has gone sideways.

* @since 4.0
*/
public class ShareKafkaListenerContainerFactory<K, V>
extends AbstractShareKafkaListenerContainerFactory<ShareKafkaMessageListenerContainer<K, V>, K, V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why do we need an abstract super class instead of combining all the logic in this class?


protected final ReentrantLock lifecycleLock = new ReentrantLock();

@NonNull
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No one ever use this annotation if we do @org.jspecify.annotations.NullMarked in the package-info.java.
That one assumes that all not marked with @Nullable properties are not null.
If you have a complain here that it might be null, then we need to revise the logic around this property.
However I believe it is good enough to mark it as @SuppressWarnings("NullAway.Init") because this property is usually initialized from the application context.

* @param containerProperties the container properties
*/
public ShareKafkaMessageListenerContainer(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory,
public ShareKafkaMessageListenerContainer(@Nullable ShareConsumerFactory<? super K, ? super V> shareConsumerFactory,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have this happened?
How did that turned out that now a ShareKafkaMessageListenerContainer can be created without a factory?

*/
@Override
public void onMessage(ConsumerRecord<K, V> record) {
onMessage(record, null, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason in this class at all.
What is wrong with using RecordMessagingMessageListenerAdapter as it is?

Signed-off-by: Soby Chacko <[email protected]>
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);

final MessagingMessageListenerAdapter<K, V> messageListener;
messageListener = createMessageListenerInstance(messageConverter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's revise changes in this class!
What you have done on these lines makes no sense so far.
The previous code version was much better to read, and it is identical to the logic you have so far.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, that was an oversight. will fix it.

* Create an empty {@link MessagingMessageListenerAdapter} instance.
* @param messageConverter the converter (may be null).
* @return the {@link MessagingMessageListenerAdapter} instance.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is OK to remove this JavaDoc.
More explanations - better.

protected ShareKafkaMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
if (topicPartitions != null && topicPartitions.length > 0) {
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), new ContainerProperties(topicPartitions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And probably this one is not covered by tests.
So far we have there in the ShareKafkaMessageListenerContainer only this:

			// Subscribe to topics, just like in the test
			ContainerProperties containerProperties = getContainerProperties();
			this.consumer.subscribe(Arrays.asList(containerProperties.getTopics()));

Not sure why you say just like in the test though. Probably just remove that comment?

}
else {
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(),
new ContainerProperties(endpoint.getTopicPattern()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably KafkaShareConsumer does not support pattern subscription as well...
Please, revise.

Signed-off-by: Soby Chacko <[email protected]>
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);

final MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still some unnecessary change for me to review? 😉

if (endpoint instanceof AbstractKafkaListenerEndpoint) {
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
}
endpoint.setupListenerContainer(instance, null); // No message converter for MVP
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is MVP, please?

received.set(record.value());
latch.countDown();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider to use src/idea/spring-framework.xml for your IntelliJ IDEA editor config.
Then reformat classes, like this and ShareKafkaMessageListenerContainer.
So, all those missed blank lines around class members are going to be added.
Therefore, next time when I do some changes in the class would be consistent with what you've done to it.

Signed-off-by: Soby Chacko <[email protected]>
public class ShareKafkaListenerContainerFactory<K, V>
implements KafkaListenerContainerFactory<ShareKafkaMessageListenerContainer<K, V>>, ApplicationEventPublisherAware, ApplicationContextAware {

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like it has to be protected.
Also nit-pick, there is this ctor:

	/**
	 * Create a new accessor for the specified Commons Log category.
	 * @see LogFactory#getLog(Class)
	 */
	public LogAccessor(Class<?> logCategory) {

* Get the container properties.
* @return the container properties
*/
public ContainerProperties getContainerProperties() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need getter for this?

I even wonder if we need this.containerProperties at all...

ShareKafkaMessageListenerContainer<K, V> instance = createContainerInstance(endpoint);
JavaUtils.INSTANCE
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be pattern expression:

if (endpoint instanceof AbstractKafkaListenerEndpoint abstractKafkaListenerEndpoint) {

And not cast in the next line.

* Get the share consumer factory.
* @return the share consumer factory
*/
public ShareConsumerFactory<? super K, ? super V> getShareConsumerFactory() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need getter for this?

* Set the share consumer factory to use for creating containers.
* @param shareConsumerFactory the share consumer factory
*/
public void setShareConsumerFactory(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need setter for this if we request it from the ctor?

}
if (endpoint.getConsumerProperties() != null) {
instance.getContainerProperties().setKafkaConsumerProperties(endpoint.getConsumerProperties());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see if the logic of this method could be reworked to JavaUtils API.

Signed-off-by: Soby Chacko <[email protected]>
if (endpoint.getAutoStartup() == null) {
instance.setAutoStartup(autoStartup);
}
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some is off with these two calls.
Both of them are about calling instance.setAutoStartup() in the end.
Can we calculate a single autoStartup variable before propagating it down?

Signed-off-by: Soby Chacko <[email protected]>
@artembilan artembilan added this to the 4.0.0-M3 milestone Jul 3, 2025
@artembilan artembilan enabled auto-merge (squash) July 3, 2025 17:11
@artembilan artembilan merged commit de1622d into spring-projects:main Jul 3, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants