Skip to content

Commit c5c93ad

Browse files
authored
Fix ProducerInterceptor usage in KafkaTempalte
The result of `ProducerInterceptor.onSend()` is out of use **Cherry-pick to `3.0.x`**
1 parent 5f1b1a5 commit c5c93ad

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,7 @@ private CompletableFuture<SendResult<K, V>> observeSend(final ProducerRecord<K,
764764
throw ex;
765765
}
766766
}
767+
767768
/**
768769
* Send the producer record.
769770
* @param producerRecord the producer record.
@@ -781,11 +782,9 @@ protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V>
781782
if (this.micrometerHolder != null) {
782783
sample = this.micrometerHolder.start();
783784
}
784-
if (this.producerInterceptor != null) {
785-
this.producerInterceptor.onSend(producerRecord);
786-
}
785+
ProducerRecord<K, V> interceptedRecord = interceptorProducerRecord(producerRecord);
787786
Future<RecordMetadata> sendFuture =
788-
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample, observation));
787+
producer.send(interceptedRecord, buildCallback(interceptedRecord, producer, future, sample, observation));
789788
// Maybe an immediate failure
790789
if (sendFuture.isDone()) {
791790
try {
@@ -802,10 +801,17 @@ protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V>
802801
if (this.autoFlush) {
803802
flush();
804803
}
805-
this.logger.trace(() -> "Sent: " + KafkaUtils.format(producerRecord));
804+
this.logger.trace(() -> "Sent: " + KafkaUtils.format(interceptedRecord));
806805
return future;
807806
}
808807

808+
private ProducerRecord<K, V> interceptorProducerRecord(ProducerRecord<K, V> producerRecord) {
809+
if (this.producerInterceptor != null) {
810+
return this.producerInterceptor.onSend(producerRecord);
811+
}
812+
return producerRecord;
813+
}
814+
809815
private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final Producer<K, V> producer,
810816
final CompletableFuture<SendResult<K, V>> future, @Nullable Object sample, Observation observation) {
811817

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -559,19 +559,21 @@ void testFutureFailureOnSend() {
559559

560560
@SuppressWarnings("unchecked")
561561
@Test
562-
void testProducerInterceptorManagedOnKafkaTemplate() {
562+
void testProducerInterceptorManagedOnKafkaTemplate() throws Exception {
563563

564564
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
565565
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
566566
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
567567
ProducerInterceptor<Integer, String> producerInterceptor = Mockito.mock(ProducerInterceptor.class);
568+
willAnswer(inv -> new ProducerRecord<>("prod-interceptor-test-1", "bar")).given(producerInterceptor).onSend(any());
568569
template.setProducerInterceptor(producerInterceptor);
569570

570571
template.setDefaultTopic("prod-interceptor-test-1");
571-
template.sendDefault("foo");
572+
CompletableFuture<SendResult<Integer, String>> resultCompletableFuture = template.sendDefault("foo");
572573

573574
verify(producerInterceptor, times(1)).onSend(any(ProducerRecord.class));
574575
verify(producerInterceptor, times(1)).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
576+
assertThat(resultCompletableFuture.get(10, TimeUnit.SECONDS).getProducerRecord()).isEqualTo(new ProducerRecord<>("prod-interceptor-test-1", "bar"));
575577
}
576578

577579
@SuppressWarnings("unchecked")
@@ -591,13 +593,15 @@ void testProducerInterceptorNotSetOnKafkaTemplateNotInvoked() {
591593

592594
@SuppressWarnings("unchecked")
593595
@Test
594-
void testCompositeProducerInterceptor() {
596+
void testCompositeProducerInterceptor() throws Exception {
595597

596598
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
597599
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
598600
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
599601
ProducerInterceptor<Integer, String> producerInterceptor1 = Mockito.mock(ProducerInterceptor.class);
600602
ProducerInterceptor<Integer, String> producerInterceptor2 = Mockito.mock(ProducerInterceptor.class);
603+
willAnswer(inv -> new ProducerRecord<>("prod-interceptor-test-3", "bar")).given(producerInterceptor1).onSend(any());
604+
willAnswer(inv -> new ProducerRecord<>("prod-interceptor-test-3", "baz")).given(producerInterceptor2).onSend(any());
601605
CompositeProducerInterceptor<Integer, String> compositeProducerInterceptor =
602606
new CompositeProducerInterceptor<>(producerInterceptor1, producerInterceptor2);
603607
template.setProducerInterceptor(compositeProducerInterceptor);
@@ -606,14 +610,15 @@ void testCompositeProducerInterceptor() {
606610
doReturn(mockProducerRecord).when(producerInterceptor1).onSend(any(ProducerRecord.class));
607611

608612
template.setDefaultTopic("prod-interceptor-test-3");
609-
template.sendDefault("foo");
613+
CompletableFuture<SendResult<Integer, String>> result = template.sendDefault("foo");
610614

611615
InOrder inOrder = inOrder(producerInterceptor1, producerInterceptor2);
612616

613617
inOrder.verify(producerInterceptor1).onSend(any(ProducerRecord.class));
614618
inOrder.verify(producerInterceptor2).onSend(any(ProducerRecord.class));
615619
inOrder.verify(producerInterceptor1).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
616620
inOrder.verify(producerInterceptor2).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
621+
assertThat(result.get(10, TimeUnit.SECONDS).getProducerRecord()).isEqualTo(new ProducerRecord<>("prod-interceptor-test-3", "baz"));
617622
}
618623

619624
@ParameterizedTest(name = "{0} is invalid")

0 commit comments

Comments
 (0)