Skip to content

Commit 74208bb

Browse files
committed
Polish "Auto-configure Kafka listener container with rebalance listener"
Closes gh-16755
1 parent abdc2e1 commit 74208bb

File tree

4 files changed

+21
-21
lines changed

4 files changed

+21
-21
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
4848

4949
private KafkaAwareTransactionManager<Object, Object> transactionManager;
5050

51+
private ConsumerAwareRebalanceListener rebalanceListener;
52+
5153
private ErrorHandler errorHandler;
5254

5355
private BatchErrorHandler batchErrorHandler;
5456

5557
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
5658

57-
private ConsumerAwareRebalanceListener rebalanceListener;
58-
5959
/**
6060
* Set the {@link KafkaProperties} to use.
6161
* @param properties the properties
@@ -89,6 +89,15 @@ void setTransactionManager(
8989
this.transactionManager = transactionManager;
9090
}
9191

92+
/**
93+
* Set the {@link ConsumerAwareRebalanceListener} to use.
94+
* @param rebalanceListener the rebalance listener.
95+
* @since 2.2
96+
*/
97+
void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) {
98+
this.rebalanceListener = rebalanceListener;
99+
}
100+
92101
/**
93102
* Set the {@link ErrorHandler} to use.
94103
* @param errorHandler the error handler
@@ -114,15 +123,6 @@ void setAfterRollbackProcessor(
114123
this.afterRollbackProcessor = afterRollbackProcessor;
115124
}
116125

117-
/**
118-
* Set the {@link ConsumerAwareRebalanceListener} to use.
119-
* @param rebalanceListener the rebalance listener.
120-
* @since 2.2
121-
*/
122-
void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) {
123-
this.rebalanceListener = rebalanceListener;
124-
}
125-
126126
/**
127127
* Configure the specified Kafka listener container factory. The factory can be
128128
* further tuned and default settings can be overridden.

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,33 +58,33 @@ class KafkaAnnotationDrivenConfiguration {
5858

5959
private final KafkaAwareTransactionManager<Object, Object> transactionManager;
6060

61+
private final ConsumerAwareRebalanceListener rebalanceListener;
62+
6163
private final ErrorHandler errorHandler;
6264

6365
private final BatchErrorHandler batchErrorHandler;
6466

6567
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
6668

67-
private final ConsumerAwareRebalanceListener rebalanceListener;
68-
6969
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
7070
ObjectProvider<RecordMessageConverter> messageConverter,
7171
ObjectProvider<BatchMessageConverter> batchMessageConverter,
7272
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
7373
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
74+
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
7475
ObjectProvider<ErrorHandler> errorHandler,
7576
ObjectProvider<BatchErrorHandler> batchErrorHandler,
76-
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
77-
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener) {
77+
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
7878
this.properties = properties;
7979
this.messageConverter = messageConverter.getIfUnique();
8080
this.batchMessageConverter = batchMessageConverter.getIfUnique(
8181
() -> new BatchMessagingMessageConverter(this.messageConverter));
8282
this.kafkaTemplate = kafkaTemplate.getIfUnique();
8383
this.transactionManager = kafkaTransactionManager.getIfUnique();
84+
this.rebalanceListener = rebalanceListener.getIfUnique();
8485
this.errorHandler = errorHandler.getIfUnique();
8586
this.batchErrorHandler = batchErrorHandler.getIfUnique();
8687
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
87-
this.rebalanceListener = rebalanceListener.getIfUnique();
8888
}
8989

9090
@Bean
@@ -97,10 +97,10 @@ public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerF
9797
configurer.setMessageConverter(messageConverterToUse);
9898
configurer.setReplyTemplate(this.kafkaTemplate);
9999
configurer.setTransactionManager(this.transactionManager);
100+
configurer.setRebalanceListener(this.rebalanceListener);
100101
configurer.setErrorHandler(this.errorHandler);
101102
configurer.setBatchErrorHandler(this.batchErrorHandler);
102103
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
103-
configurer.setRebalanceListener(this.rebalanceListener);
104104
return configurer;
105105
}
106106

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -767,8 +767,7 @@ protected static class RebalanceListenerConfiguration {
767767

768768
@Bean
769769
public ConsumerAwareRebalanceListener rebalanceListener() {
770-
return new ConsumerAwareRebalanceListener() {
771-
};
770+
return mock(ConsumerAwareRebalanceListener.class);
772771
}
773772

774773
}

spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6158,8 +6158,9 @@ The following component creates a listener endpoint on the `someTopic` topic:
61586158
----
61596159

61606160
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the
6161-
container factory. Similarly, if a `ErrorHandler` or `AfterRollbackProcessor` bean is
6162-
defined, it is automatically associated to the default factory.
6161+
container factory. Similarly, if a `ErrorHandler`, `AfterRollbackProcessor` or
6162+
`ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the
6163+
default factory.
61636164

61646165
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean
61656166
is associated to the default factory. If only a `RecordMessageConverter` bean is present

0 commit comments

Comments
 (0)