Skip to content

Commit 5adf2d1

Browse files
garyrussellartembilan
authored andcommitted
Add KafkaTemplate Request/Reply support
- Add `AsyncKafkaTemplate` with `sendAndReceive()` - In `@KafkaListener`, echo the `correlationId` header * Polishing - increase timeout on good test - add exception to future on timeout * Polishing - allow `@SendTo` expression to return `byte[]`. * More Polishing - add standard REPLY_TOPIC header and optional REPLY_PARTITION header. * Improve test - wait for reply container assignment. When documenting this, we should recommend that users don't start sending until the reply partitions are assigned. * Polishing; docs; rename to `ReplyingKafkaTemplate` - Async was misleading since KT is also Async * Polishing - PR Comments * Polishing - PR Comments and add `SimpleKafkaHeaderMapper` * Fix package tangle - move new template to its own package. * Use binary for REPLY_PARTITION header. * Polishing - PR comments. * Rebase and final polishing
1 parent 0d09e8d commit 5adf2d1

18 files changed

+1174
-109
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2017 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import org.apache.kafka.common.PartitionInfo;
3232
import org.apache.kafka.common.TopicPartition;
3333

34+
import org.springframework.kafka.support.KafkaHeaders;
3435
import org.springframework.kafka.support.LoggingProducerListener;
3536
import org.springframework.kafka.support.ProducerListener;
3637
import org.springframework.kafka.support.SendResult;
@@ -194,6 +195,12 @@ public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
194195
@Override
195196
public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
196197
ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
198+
if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
199+
byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
200+
if (correlationId != null) {
201+
producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
202+
}
203+
}
197204
return doSend((ProducerRecord<K, V>) producerRecord);
198205
}
199206

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,7 +44,7 @@
4444
* @author Artem Bilan
4545
*/
4646
public abstract class AbstractMessageListenerContainer<K, V>
47-
implements MessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware, SmartLifecycle {
47+
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware {
4848

4949
/**
5050
* The default {@link SmartLifecycle} phase for listener containers {@value #DEFAULT_PHASE}.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
/**
20+
* Generic message listener container; adds parameters.
21+
*
22+
* @param <K> the key type.
23+
* @param <V> the value type.
24+
*
25+
* @author Gary Russell
26+
* @since 2.1.3
27+
*
28+
*/
29+
public interface GenericMessageListenerContainer<K, V> extends MessageListenerContainer {
30+
31+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.lang.reflect.ParameterizedType;
2121
import java.lang.reflect.Type;
2222
import java.lang.reflect.WildcardType;
23+
import java.nio.ByteBuffer;
24+
import java.nio.charset.StandardCharsets;
2325
import java.util.Collection;
2426
import java.util.List;
2527
import java.util.Map;
@@ -45,13 +47,17 @@
4547
import org.springframework.kafka.listener.ListenerExecutionFailedException;
4648
import org.springframework.kafka.listener.MessageListener;
4749
import org.springframework.kafka.support.Acknowledgment;
50+
import org.springframework.kafka.support.KafkaHeaders;
4851
import org.springframework.kafka.support.converter.MessagingMessageConverter;
4952
import org.springframework.kafka.support.converter.RecordMessageConverter;
53+
import org.springframework.lang.Nullable;
5054
import org.springframework.messaging.Message;
5155
import org.springframework.messaging.MessagingException;
5256
import org.springframework.messaging.converter.MessageConversionException;
5357
import org.springframework.messaging.handler.annotation.Payload;
58+
import org.springframework.messaging.support.MessageBuilder;
5459
import org.springframework.util.Assert;
60+
import org.springframework.util.StringUtils;
5561

5662
/**
5763
* An abstract {@link MessageListener} adapter providing the necessary infrastructure
@@ -154,16 +160,22 @@ protected boolean isConsumerRecordList() {
154160
/**
155161
* Set the topic to which to send any result from the method invocation.
156162
* May be a SpEL expression {@code !{...}} evaluated at runtime.
157-
* @param replyTopic the topic or expression.
163+
* @param replyTopicParam the topic or expression.
158164
* @since 2.0
159165
*/
160-
public void setReplyTopic(String replyTopic) {
166+
public void setReplyTopic(String replyTopicParam) {
167+
String replyTopic = replyTopicParam;
168+
if (!StringUtils.hasText(replyTopic)) {
169+
replyTopic = PARSER_CONTEXT.getExpressionPrefix() + "source.headers['"
170+
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
171+
}
161172
if (replyTopic.contains(PARSER_CONTEXT.getExpressionPrefix())) {
162173
this.replyTopicExpression = PARSER.parseExpression(replyTopic, PARSER_CONTEXT);
163174
}
164175
else {
165176
this.replyTopicExpression = new LiteralExpression(replyTopic);
166177
}
178+
167179
}
168180

169181
/**
@@ -273,7 +285,7 @@ protected void handleResult(Object resultArg, Object request, Object source) {
273285
String replyTopic = evaluateReplyTopic(request, source, resultArg);
274286
Assert.state(replyTopic == null || this.replyTemplate != null,
275287
"a KafkaTemplate is required to support replies");
276-
sendResponse(result, replyTopic);
288+
sendResponse(result, replyTopic, source);
277289
}
278290

279291
private String evaluateReplyTopic(Object request, Object source, Object result) {
@@ -293,30 +305,85 @@ private String evaluateTopic(Object request, Object source, Object result, Expre
293305
}
294306
else {
295307
Object value = sendTo.getValue(this.evaluationContext, new ReplyExpressionRoot(request, source, result));
296-
Assert.state(value instanceof String, "replyTopic expression must evaluate to a String or Address");
308+
boolean isByteArray = value instanceof byte[];
309+
if (!(value instanceof String || isByteArray)) {
310+
throw new IllegalStateException(
311+
"replyTopic expression must evaluate to a String or byte[], it is: "
312+
+ (value == null ? null : value.getClass().getName()));
313+
}
314+
if (isByteArray) {
315+
return new String((byte[]) value, StandardCharsets.UTF_8);
316+
}
297317
return (String) value;
298318
}
299319
}
300320

301-
@SuppressWarnings("unchecked")
321+
/**
322+
* Send the result to the topic.
323+
*
324+
* @param result the result.
325+
* @param topic the topic.
326+
* @deprecated in favor of {@link #sendResponse(Object, String, Object)}.
327+
*/
328+
@Deprecated
302329
protected void sendResponse(Object result, String topic) {
330+
sendResponse(result, topic, null);
331+
}
332+
333+
/**
334+
* Send the result to the topic.
335+
*
336+
* @param result the result.
337+
* @param topic the topic.
338+
* @param source the source (input).
339+
* @since 2.1.3
340+
*/
341+
@SuppressWarnings("unchecked")
342+
protected void sendResponse(Object result, String topic, @Nullable Object source) {
303343
if (topic == null) {
304344
if (this.logger.isDebugEnabled()) {
305345
this.logger.debug("No replyTopic to handle the reply: " + result);
306346
}
307347
}
348+
else if (result instanceof Message) {
349+
this.replyTemplate.send((Message<?>) result);
350+
}
308351
else {
309352
if (result instanceof Collection) {
310353
((Collection<V>) result).forEach(v -> {
311354
this.replyTemplate.send(topic, v);
312355
});
313356
}
314357
else {
315-
this.replyTemplate.send(topic, (V) result);
358+
byte[] correlationId = null;
359+
boolean sourceIsMessage = source instanceof Message;
360+
if (sourceIsMessage
361+
&& ((Message<?>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID) != null) {
362+
correlationId = ((Message<?>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
363+
}
364+
if (sourceIsMessage) {
365+
MessageBuilder<Object> builder = MessageBuilder.withPayload(result)
366+
.setHeader(KafkaHeaders.TOPIC, topic);
367+
if (correlationId != null) {
368+
builder.setHeader(KafkaHeaders.CORRELATION_ID, correlationId);
369+
}
370+
setPartition(builder, ((Message<?>) source));
371+
this.replyTemplate.send(builder.build());
372+
}
373+
else {
374+
this.replyTemplate.send(topic, (V) result);
375+
}
316376
}
317377
}
318378
}
319379

380+
private void setPartition(MessageBuilder<Object> builder, Message<?> source) {
381+
byte[] partitionBytes = source.getHeaders().get(KafkaHeaders.REPLY_PARTITION, byte[].class);
382+
if (partitionBytes != null) {
383+
builder.setHeader(KafkaHeaders.PARTITION_ID, ByteBuffer.wrap(partitionBytes).getInt());
384+
}
385+
}
386+
320387
protected final String createMessagingErrorMessage(String description, Object payload) {
321388
return description + "\n"
322389
+ "Endpoint handler details:\n"
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.requestreply;
18+
19+
import java.math.BigInteger;
20+
import java.util.Arrays;
21+
22+
import org.springframework.util.Assert;
23+
24+
/**
25+
* Wrapper for byte[] that can be used as a hash key. We could have used BigInteger
26+
* instead but this wrapper is much less expensive. We do use a BigInteger in
27+
* {@link #toString()} though.
28+
*
29+
* @author Gary Russell
30+
* @since 2.1.3
31+
*/
32+
public final class CorrelationKey {
33+
34+
final private byte[] correlationId;
35+
36+
private volatile Integer hashCode;
37+
38+
public CorrelationKey(byte[] correlationId) {
39+
Assert.notNull(correlationId, "'correlationId' cannot be null");
40+
this.correlationId = correlationId;
41+
}
42+
43+
public byte[] getCorrelationId() {
44+
return this.correlationId;
45+
}
46+
47+
@Override
48+
public int hashCode() {
49+
if (this.hashCode != null) {
50+
return this.hashCode;
51+
}
52+
final int prime = 31;
53+
int result = 1;
54+
result = prime * result + Arrays.hashCode(this.correlationId);
55+
this.hashCode = result;
56+
return result;
57+
}
58+
59+
@Override
60+
public boolean equals(Object obj) {
61+
if (this == obj) {
62+
return true;
63+
}
64+
if (obj == null) {
65+
return false;
66+
}
67+
if (getClass() != obj.getClass()) {
68+
return false;
69+
}
70+
CorrelationKey other = (CorrelationKey) obj;
71+
if (!Arrays.equals(this.correlationId, other.correlationId)) {
72+
return false;
73+
}
74+
return true;
75+
}
76+
77+
@Override
78+
public String toString() {
79+
return "[" + new BigInteger(this.correlationId) + "]";
80+
}
81+
82+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.requestreply;
18+
19+
import org.apache.kafka.clients.producer.ProducerRecord;
20+
21+
/**
22+
* Request/reply operations.
23+
*
24+
* @param <K> the key type.
25+
* @param <V> the outbound data type.
26+
* @param <R> the reply data type.
27+
*
28+
* @author Gary Russell
29+
* @since 2.1.3
30+
*
31+
*/
32+
public interface ReplyingKafkaOperations<K, V, R> {
33+
34+
/**
35+
* Send a request and receive a reply.
36+
* @param record the record to send.
37+
* @return a RequestReplyFuture.
38+
*/
39+
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
40+
41+
}

0 commit comments

Comments
 (0)