Skip to content

Commit b8378fc

Browse files
authored
Refactor KafkaMDChA for code duplication (#8724)
Simplify a `KafkaMessageDrivenChannelAdapter.enhanceHeadersAndSaveAttributes()` method for future maintenance avoiding code duplication
1 parent 7b16183 commit b8378fc

File tree

1 file changed

+32
-41
lines changed

1 file changed

+32
-41
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.concurrent.atomic.AtomicInteger;
2424
import java.util.function.BiConsumer;
25+
import java.util.function.Supplier;
2526

2627
import org.apache.kafka.clients.consumer.Consumer;
2728
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -150,11 +151,11 @@ public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> m
150151
* @param messageConverter the converter.
151152
*/
152153
public void setMessageConverter(MessageConverter messageConverter) {
153-
if (messageConverter instanceof RecordMessageConverter) {
154-
this.recordListener.setMessageConverter((RecordMessageConverter) messageConverter);
154+
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) {
155+
this.recordListener.setMessageConverter(recordMessageConverter);
155156
}
156-
else if (messageConverter instanceof BatchMessageConverter) {
157-
this.batchListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
157+
else if (messageConverter instanceof BatchMessageConverter batchMessageConverter) {
158+
this.batchListener.setBatchMessageConverter(batchMessageConverter);
158159
}
159160
else {
160161
throw new IllegalArgumentException(
@@ -262,6 +263,7 @@ public void setPayloadType(Class<?> payloadType) {
262263
*/
263264
public void setOnPartitionsAssignedSeekCallback(
264265
BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
266+
265267
this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
266268
}
267269

@@ -476,40 +478,35 @@ private boolean passesFilter(ConsumerRecord<K, V> record) {
476478
}
477479

478480
private Message<?> enhanceHeadersAndSaveAttributes(Message<?> message, ConsumerRecord<K, V> record) {
479-
Message<?> messageToReturn = message;
480-
if (message.getHeaders() instanceof KafkaMessageHeaders) {
481-
Map<String, Object> rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
482-
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
483-
AtomicInteger deliveryAttempt =
484-
new AtomicInteger(((RetryContext) ATTRIBUTES_HOLDER.get()).getRetryCount() + 1);
485-
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
486-
}
487-
else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) {
488-
Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
489-
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,
490-
new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
491-
}
492-
if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
493-
rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
494-
}
481+
Supplier<Message<?>> messageSupplier = () -> message;
482+
BiConsumer<String, Object> headersAcceptor;
483+
484+
if (message.getHeaders() instanceof KafkaMessageHeaders kafkaMessageHeaders) {
485+
Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
486+
headersAcceptor = rawHeaders::put;
495487
}
496488
else {
497489
MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
498-
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
499-
AtomicInteger deliveryAttempt =
500-
new AtomicInteger(((RetryContext) ATTRIBUTES_HOLDER.get()).getRetryCount() + 1);
501-
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
502-
}
503-
else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) {
504-
Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
505-
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,
506-
new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
507-
}
508-
if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
509-
builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
510-
}
511-
messageToReturn = builder.build();
490+
headersAcceptor = builder::setHeader;
491+
messageSupplier = builder::build;
492+
}
493+
494+
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
495+
AtomicInteger deliveryAttempt =
496+
new AtomicInteger(((RetryContext) ATTRIBUTES_HOLDER.get()).getRetryCount() + 1);
497+
headersAcceptor.accept(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
512498
}
499+
else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) {
500+
Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
501+
headersAcceptor.accept(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,
502+
new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
503+
}
504+
if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
505+
headersAcceptor.accept(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
506+
}
507+
508+
Message<?> messageToReturn = messageSupplier.get();
509+
513510
setAttributesIfNecessary(record, messageToReturn, false);
514511
return messageToReturn;
515512
}
@@ -625,12 +622,6 @@ public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T
625622
ATTRIBUTES_HOLDER.remove();
626623
}
627624

628-
@Override
629-
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
630-
Throwable throwable) {
631-
// Empty
632-
}
633-
634625
}
635626

636627
}

0 commit comments

Comments
 (0)