Skip to content

Commit 5ce4b1e

Browse files
committed
Fix deprecation warning in Kafka channel adapters
* The `DefaultKafkaHeaderMapper` is deprecated in Spring Kafka, so we cannot `import` it * Also fix `PostgresChannelMessageTableSubscriberTests` for `delay(Duration.ZERO)` on retry to minimize test execution time
1 parent dc4d5fd commit 5ce4b1e

File tree

4 files changed

+4
-7
lines changed

4 files changed

+4
-7
lines changed

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ void testRetryOnErrorDuringDispatch(boolean transactionsEnabled) throws Interrup
248248

249249
int maxAttempts = 2;
250250
postgresSubscribableChannel.setRetryTemplate(
251-
new RetryTemplate(RetryPolicy.builder().maxAttempts(maxAttempts).build()));
251+
new RetryTemplate(RetryPolicy.builder().maxAttempts(maxAttempts).delay(Duration.ZERO).build()));
252252

253253
if (transactionsEnabled) {
254254
postgresSubscribableChannel.setTransactionManager(transactionManager);

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.springframework.kafka.listener.ContainerProperties;
4747
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
4848
import org.springframework.kafka.support.Acknowledgment;
49-
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
5049
import org.springframework.kafka.support.JacksonPresent;
5150
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
5251
import org.springframework.kafka.support.KafkaHeaders;
@@ -123,7 +122,7 @@ public KafkaInboundGateway(AbstractMessageListenerContainer<K, V> messageListene
123122
}
124123
else if (JacksonPresent.isJackson2Present()) {
125124
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
126-
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
125+
var headerMapper = new org.springframework.kafka.support.DefaultKafkaHeaderMapper();
127126
headerMapper.addTrustedPackages(
128127
org.springframework.integration.support.json.JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES
129128
.toArray(new String[0]));

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
5454
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
5555
import org.springframework.kafka.support.Acknowledgment;
56-
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
5756
import org.springframework.kafka.support.JacksonPresent;
5857
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
5958
import org.springframework.kafka.support.KafkaHeaders;
@@ -150,7 +149,7 @@ public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> m
150149
this.batchListener.setMessageConverter(messageConverter);
151150
}
152151
else if (JacksonPresent.isJackson2Present()) {
153-
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
152+
var headerMapper = new org.springframework.kafka.support.DefaultKafkaHeaderMapper();
154153
headerMapper.addTrustedPackages(
155154
org.springframework.integration.support.json.JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES
156155
.toArray(new String[0]));

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
import org.springframework.kafka.listener.ErrorHandlingUtils;
6767
import org.springframework.kafka.listener.LoggingCommitCallback;
6868
import org.springframework.kafka.support.Acknowledgment;
69-
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
7069
import org.springframework.kafka.support.JacksonPresent;
7170
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
7271
import org.springframework.kafka.support.KafkaHeaders;
@@ -273,7 +272,7 @@ public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory,
273272
messagingMessageConverter.setHeaderMapper(headerMapper);
274273
}
275274
else if (JacksonPresent.isJackson2Present()) {
276-
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
275+
var headerMapper = new org.springframework.kafka.support.DefaultKafkaHeaderMapper();
277276
headerMapper.addTrustedPackages(
278277
org.springframework.integration.support.json.JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES
279278
.toArray(new String[0]));

0 commit comments

Comments
 (0)