Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,13 +21,16 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.dsl.ComponentsRegistration;
import org.springframework.integration.dsl.MessageHandlerSpec;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.ProducerRecordCreator;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.KafkaHeaderMapper;
Expand Down Expand Up @@ -375,6 +378,31 @@ public S futuresChannel(String futuresChannel) {
return _this();
}

/**
* Set a {@link ProducerRecordCreator} to create the {@link ProducerRecord}. Ignored
* if {@link #useTemplateConverter(boolean) useTemplateConverter} is true.
* @param creator the creator.
* @return the spec.
* @since 5.5.5
*/
public S producerRecordCreator(ProducerRecordCreator<K, V> creator) {
this.target.setProducerRecordCreator(creator);
return _this();
}

/**
* Set to true to use the template's message converter to create the
* {@link ProducerRecord} instead of the
* {@link #producerRecordCreator(ProducerRecordCreator) producerRecordCreator}.
* @param use true to use the converter.
* @return the spec.
* @since 5.5.5
*/
public S useTemplateConverter(boolean use) {
this.target.setUseTemplateConverter(use);
return _this();
}

/**
* A {@link KafkaTemplate}-based {@link KafkaProducerMessageHandlerSpec} extension.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes

private int timeoutBuffer = DEFAULT_TIMEOUT_BUFFER;

private boolean useTemplateConverter;

private volatile byte[] singleReplyTopic;

public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
Expand Down Expand Up @@ -383,8 +385,10 @@ public void setReplyPayloadType(Type payloadType) {
}

/**
* Set a {@link ProducerRecordCreator} to create the {@link ProducerRecord}.
* Set a {@link ProducerRecordCreator} to create the {@link ProducerRecord}. Ignored
* if {@link #setUseTemplateConverter(boolean) useTemplateConverter} is true.
* @param producerRecordCreator the creator.
* @see #setUseTemplateConverter(boolean)
*/
public void setProducerRecordCreator(ProducerRecordCreator<K, V> producerRecordCreator) {
Assert.notNull(producerRecordCreator, "'producerRecordCreator' cannot be null");
Expand All @@ -402,6 +406,18 @@ public void setTimeoutBuffer(int timeoutBuffer) {
this.timeoutBuffer = timeoutBuffer;
}

/**
* Set to true to use the template's message converter to create the
* {@link ProducerRecord} instead of the
* {@link #setProducerRecordCreator(ProducerRecordCreator) producerRecordCreator}.
* @param useTemplateConverter true to use the converter.
* @since 5.5.5
* @see #setProducerRecordCreator(ProducerRecordCreator)
*/
public void setUseTemplateConverter(boolean useTemplateConverter) {
this.useTemplateConverter = useTemplateConverter;
}

@Override
public String getComponentType() {
return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
Expand Down Expand Up @@ -541,6 +557,9 @@ private ProducerRecord<K, V> createProducerRecord(final Message<?> message) {
if (topic == null) {
topic = this.kafkaTemplate.getDefaultTopic();
}
if (this.useTemplateConverter) {
return (ProducerRecord<K, V>) this.kafkaTemplate.getMessageConverter().fromMessage(message, topic);
}

Assert.state(StringUtils.hasText(topic), "The 'topic' can not be empty or null");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.ProducerRecordCreator;
import org.springframework.integration.kafka.support.KafkaIntegrationHeaders;
import org.springframework.integration.kafka.support.KafkaSendFailureException;
import org.springframework.integration.support.MessageBuilder;
Expand All @@ -90,6 +91,7 @@
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaTransactionManager;
Expand Down Expand Up @@ -783,6 +785,39 @@ void testNoFlush() {
handler.stop();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void conversion() {
ProducerFactory pf = mock(ProducerFactory.class);
Producer producer = mock(Producer.class);
given(pf.createProducer()).willReturn(producer);
ListenableFuture future = mock(ListenableFuture.class);
willReturn(future).given(producer).send(any(ProducerRecord.class), any(Callback.class));
KafkaTemplate template = new KafkaTemplate(pf);
RecordMessageConverter converter = mock(RecordMessageConverter.class);
ProducerRecord recordFromConverter = mock(ProducerRecord.class);
given(converter.fromMessage(any(), any())).willReturn(recordFromConverter);
template.setMessageConverter(converter);
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(template);
handler.setTopicExpression(new LiteralExpression("bar"));
handler.setBeanFactory(mock(BeanFactory.class));
ProducerRecordCreator creator = mock(ProducerRecordCreator.class);
ProducerRecord recordFromCreator = mock(ProducerRecord.class);
given(creator.create(any(), any(), any(), any(), any(), any(), any())).willReturn(recordFromCreator);
handler.setProducerRecordCreator(creator);
handler.afterPropertiesSet();
handler.start();
handler.handleMessage(new GenericMessage<>("foo"));
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
verify(producer).send(captor.capture(), any(Callback.class));
assertThat(captor.getValue()).isSameAs(recordFromCreator);
handler.setUseTemplateConverter(true);
handler.handleMessage(new GenericMessage<>("foo"));
verify(producer, times(2)).send(captor.capture(), any(Callback.class));
assertThat(captor.getValue()).isSameAs(recordFromConverter);
handler.stop();
}

@SuppressWarnings("serial")
static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {

Expand Down