diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaProducerMessageHandlerSpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaProducerMessageHandlerSpec.java index cec2e5cdfe1..96aeccacbc2 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaProducerMessageHandlerSpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaProducerMessageHandlerSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,8 @@ import java.util.function.Consumer; import java.util.function.Function; +import org.apache.kafka.clients.producer.ProducerRecord; + import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.dsl.ComponentsRegistration; @@ -28,6 +30,7 @@ import org.springframework.integration.expression.FunctionExpression; import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; +import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.ProducerRecordCreator; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.KafkaHeaderMapper; @@ -375,6 +378,31 @@ public S futuresChannel(String futuresChannel) { return _this(); } + /** + * Set a {@link ProducerRecordCreator} to create the {@link ProducerRecord}. Ignored + * if {@link #useTemplateConverter(boolean) useTemplateConverter} is true. + * @param creator the creator. + * @return the spec. + * @since 5.5.5 + */ + public S producerRecordCreator(ProducerRecordCreator creator) { + this.target.setProducerRecordCreator(creator); + return _this(); + } + + /** + * Set to true to use the template's message converter to create the + * {@link ProducerRecord} instead of the + * {@link #producerRecordCreator(ProducerRecordCreator) producerRecordCreator}. + * @param use true to use the converter. + * @return the spec. + * @since 5.5.5 + */ + public S useTemplateConverter(boolean use) { + this.target.setUseTemplateConverter(use); + return _this(); + } + /** * A {@link KafkaTemplate}-based {@link KafkaProducerMessageHandlerSpec} extension. * diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java index 01c3637bd6a..f3a8e9d07e5 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java @@ -163,6 +163,8 @@ public class KafkaProducerMessageHandler extends AbstractReplyProducingMes private int timeoutBuffer = DEFAULT_TIMEOUT_BUFFER; + private boolean useTemplateConverter; + private volatile byte[] singleReplyTopic; public KafkaProducerMessageHandler(final KafkaTemplate kafkaTemplate) { @@ -383,8 +385,10 @@ public void setReplyPayloadType(Type payloadType) { } /** - * Set a {@link ProducerRecordCreator} to create the {@link ProducerRecord}. + * Set a {@link ProducerRecordCreator} to create the {@link ProducerRecord}. Ignored + * if {@link #setUseTemplateConverter(boolean) useTemplateConverter} is true. * @param producerRecordCreator the creator. + * @see #setUseTemplateConverter(boolean) */ public void setProducerRecordCreator(ProducerRecordCreator producerRecordCreator) { Assert.notNull(producerRecordCreator, "'producerRecordCreator' cannot be null"); @@ -402,6 +406,18 @@ public void setTimeoutBuffer(int timeoutBuffer) { this.timeoutBuffer = timeoutBuffer; } + /** + * Set to true to use the template's message converter to create the + * {@link ProducerRecord} instead of the + * {@link #setProducerRecordCreator(ProducerRecordCreator) producerRecordCreator}. + * @param useTemplateConverter true to use the converter. + * @since 5.5.5 + * @see #setProducerRecordCreator(ProducerRecordCreator) + */ + public void setUseTemplateConverter(boolean useTemplateConverter) { + this.useTemplateConverter = useTemplateConverter; + } + @Override public String getComponentType() { return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter"; @@ -541,6 +557,9 @@ private ProducerRecord createProducerRecord(final Message message) { if (topic == null) { topic = this.kafkaTemplate.getDefaultTopic(); } + if (this.useTemplateConverter) { + return (ProducerRecord) this.kafkaTemplate.getMessageConverter().fromMessage(message, topic); + } Assert.state(StringUtils.hasText(topic), "The 'topic' can not be empty or null"); diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java index 347bb320f3d..c79b132e91e 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java @@ -73,6 +73,7 @@ import org.springframework.integration.expression.FunctionExpression; import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; +import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.ProducerRecordCreator; import org.springframework.integration.kafka.support.KafkaIntegrationHeaders; import org.springframework.integration.kafka.support.KafkaSendFailureException; import org.springframework.integration.support.MessageBuilder; @@ -90,6 +91,7 @@ import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.TransactionSupport; +import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.transaction.KafkaTransactionManager; @@ -783,6 +785,39 @@ void testNoFlush() { handler.stop(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void conversion() { + ProducerFactory pf = mock(ProducerFactory.class); + Producer producer = mock(Producer.class); + given(pf.createProducer()).willReturn(producer); + ListenableFuture future = mock(ListenableFuture.class); + willReturn(future).given(producer).send(any(ProducerRecord.class), any(Callback.class)); + KafkaTemplate template = new KafkaTemplate(pf); + RecordMessageConverter converter = mock(RecordMessageConverter.class); + ProducerRecord recordFromConverter = mock(ProducerRecord.class); + given(converter.fromMessage(any(), any())).willReturn(recordFromConverter); + template.setMessageConverter(converter); + KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(template); + handler.setTopicExpression(new LiteralExpression("bar")); + handler.setBeanFactory(mock(BeanFactory.class)); + ProducerRecordCreator creator = mock(ProducerRecordCreator.class); + ProducerRecord recordFromCreator = mock(ProducerRecord.class); + given(creator.create(any(), any(), any(), any(), any(), any(), any())).willReturn(recordFromCreator); + handler.setProducerRecordCreator(creator); + handler.afterPropertiesSet(); + handler.start(); + handler.handleMessage(new GenericMessage<>("foo")); + ArgumentCaptor captor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(producer).send(captor.capture(), any(Callback.class)); + assertThat(captor.getValue()).isSameAs(recordFromCreator); + handler.setUseTemplateConverter(true); + handler.handleMessage(new GenericMessage<>("foo")); + verify(producer, times(2)).send(captor.capture(), any(Callback.class)); + assertThat(captor.getValue()).isSameAs(recordFromConverter); + handler.stop(); + } + @SuppressWarnings("serial") static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {