Skip to content

Commit 60f0004

Browse files
committed
Add bindSourceRecord for Kinesis Channel Adapters
1 parent e6e2465 commit 60f0004

File tree

4 files changed

+59
-18
lines changed

4 files changed

+59
-18
lines changed

build.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ plugins {
1212
id 'eclipse'
1313
id 'idea'
1414
id 'jacoco'
15-
id 'org.sonarqube' version '2.7'
15+
id 'org.sonarqube' version '2.7.1'
1616
id 'checkstyle'
1717
id 'org.ajoberstar.grgit' version '3.1.1'
1818
}
@@ -34,11 +34,11 @@ ext {
3434
assertjVersion = '3.12.2'
3535
awaitilityVersion = '3.1.6'
3636
dynamodbLockClientVersion = '1.1.0'
37-
jacksonVersion = '2.9.8'
37+
jacksonVersion = '2.9.9'
3838
servletApiVersion = '4.0.1'
3939
log4jVersion = '2.11.2'
4040
springCloudAwsVersion = '2.1.1.RELEASE'
41-
springIntegrationVersion = '5.1.4.RELEASE'
41+
springIntegrationVersion = '5.1.6.RELEASE'
4242
kinesisClientVersion = '1.10.0'
4343
kinesisProducerVersion = '0.12.11'
4444

@@ -87,7 +87,7 @@ jacoco {
8787

8888
checkstyle {
8989
configFile = file("${rootDir}/src/checkstyle/checkstyle.xml")
90-
toolVersion = "8.20"
90+
toolVersion = "8.21"
9191
}
9292

9393
dependencies {

src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2525
import org.springframework.core.task.TaskExecutor;
2626
import org.springframework.core.task.support.ExecutorServiceAdapter;
27+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2728
import org.springframework.integration.aws.support.AwsHeaders;
2829
import org.springframework.integration.endpoint.MessageProducerSupport;
2930
import org.springframework.integration.mapping.InboundMessageMapper;
@@ -105,6 +106,8 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport {
105106

106107
private String workerId = UUID.randomUUID().toString();
107108

109+
private boolean bindSourceRecord;
110+
108111
public KclMessageDrivenChannelAdapter(String streams) {
109112
this(streams, AmazonKinesisClientBuilder.defaultClient(),
110113
AmazonCloudWatchClientBuilder.defaultClient(), AmazonDynamoDBClientBuilder.defaultClient(),
@@ -188,6 +191,17 @@ public void setWorkerId(String workerId) {
188191
this.workerId = workerId;
189192
}
190193

194+
/**
195+
* Set to true to bind the source consumer record in the header named
196+
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}.
197+
* Does not apply to batch listeners.
198+
* @param bindSourceRecord true to bind.
199+
* @since 2.2
200+
*/
201+
public void setBindSourceRecord(boolean bindSourceRecord) {
202+
this.bindSourceRecord = bindSourceRecord;
203+
}
204+
191205
@Override
192206
protected void onInit() {
193207
super.onInit();
@@ -364,6 +378,10 @@ private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(Record
364378
.setHeader(AwsHeaders.RECEIVED_STREAM, KclMessageDrivenChannelAdapter.this.stream)
365379
.setHeader(AwsHeaders.SHARD, this.shardId);
366380

381+
if (KclMessageDrivenChannelAdapter.this.bindSourceRecord) {
382+
messageBuilder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
383+
}
384+
367385
if (messageToUse != null) {
368386
messageBuilder.copyHeadersIfAbsent(messageToUse.getHeaders());
369387
}

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.springframework.core.AttributeAccessor;
4747
import org.springframework.core.convert.converter.Converter;
4848
import org.springframework.core.serializer.support.DeserializingConverter;
49+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
4950
import org.springframework.integration.aws.support.AwsHeaders;
5051
import org.springframework.integration.endpoint.MessageProducerSupport;
5152
import org.springframework.integration.mapping.InboundMessageMapper;
@@ -159,6 +160,8 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport i
159160

160161
private LockRegistry lockRegistry;
161162

163+
private boolean bindSourceRecord;
164+
162165
private volatile boolean active;
163166

164167
private volatile int consumerInvokerMaxCapacity;
@@ -235,7 +238,7 @@ public void setCheckpointMode(CheckpointMode checkpointMode) {
235238
/**
236239
* Sets the interval between 2 checkpoints. Only used when checkpointMode is periodic.
237240
* @param checkpointsInterval interval between 2 checkpoints (in milliseconds)
238-
* @since 2.2.0
241+
* @since 2.2
239242
*/
240243
public void setCheckpointsInterval(long checkpointsInterval) {
241244
this.checkpointsInterval = checkpointsInterval;
@@ -311,6 +314,17 @@ public void setLockRegistry(LockRegistry lockRegistry) {
311314
this.lockRegistry = lockRegistry;
312315
}
313316

317+
/**
318+
* Set to true to bind the source consumer record in the header named
319+
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}.
320+
* Does not apply to batch listeners.
321+
* @param bindSourceRecord true to bind.
322+
* @since 2.2
323+
*/
324+
public void setBindSourceRecord(boolean bindSourceRecord) {
325+
this.bindSourceRecord = bindSourceRecord;
326+
}
327+
314328
@Override
315329
protected void onInit() {
316330
super.onInit();
@@ -527,7 +541,8 @@ private void populateShardsForStream(final String stream, final CountDownLatch s
527541
}
528542

529543
if (describeStreamResult == null ||
530-
!StreamStatus.ACTIVE.toString().equals(describeStreamResult.getStreamDescription().getStreamStatus())) {
544+
!StreamStatus.ACTIVE.toString().equals(
545+
describeStreamResult.getStreamDescription().getStreamStatus())) {
531546

532547
if (describeStreamRetries++ > this.describeStreamRetries) {
533548
ResourceNotFoundException resourceNotFoundException =
@@ -541,7 +556,7 @@ private void populateShardsForStream(final String stream, final CountDownLatch s
541556
continue;
542557
}
543558
catch (InterruptedException e) {
544-
Thread.interrupted();
559+
Thread.currentThread().interrupt();
545560
throw new IllegalStateException("The [describeStream] thread for the stream ["
546561
+ stream + "] has been interrupted.", e);
547562
}
@@ -729,10 +744,9 @@ public void run() {
729744
}
730745
}
731746

732-
for (Iterator<ShardConsumer> iterator =
747+
Iterator<ShardConsumer> iterator =
733748
KinesisMessageDrivenChannelAdapter.this.shardConsumers.values().iterator();
734-
iterator.hasNext(); ) {
735-
749+
while (iterator.hasNext()) {
736750
ShardConsumer shardConsumer = iterator.next();
737751
shardConsumer.execute();
738752
if (ConsumerState.STOP == shardConsumer.state) {
@@ -795,7 +809,8 @@ private final class ShardConsumer {
795809
ShardConsumer(KinesisShardOffset shardOffset) {
796810
this.shardOffset = new KinesisShardOffset(shardOffset);
797811
this.key = buildCheckpointKeyForShard(shardOffset.getStream(), shardOffset.getShard());
798-
this.checkpointer = new ShardCheckpointer(KinesisMessageDrivenChannelAdapter.this.checkpointStore, this.key);
812+
this.checkpointer = new ShardCheckpointer(KinesisMessageDrivenChannelAdapter.this.checkpointStore,
813+
this.key);
799814
}
800815

801816
void setNotifier(Runnable notifier) {
@@ -838,7 +853,8 @@ void execute() {
838853
if (logger.isInfoEnabled() && this.state == ConsumerState.NEW) {
839854
logger.info("The [" + this + "] has been started.");
840855
}
841-
GetShardIteratorRequest shardIteratorRequest = this.shardOffset.toShardIteratorRequest();
856+
GetShardIteratorRequest shardIteratorRequest =
857+
this.shardOffset.toShardIteratorRequest();
842858
this.shardIterator =
843859
KinesisMessageDrivenChannelAdapter.this.amazonKinesis
844860
.getShardIterator(shardIteratorRequest)
@@ -1034,8 +1050,8 @@ private void processRecords(List<Record> records) {
10341050
}
10351051
else if (CheckpointMode.periodic.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode) &&
10361052
System.currentTimeMillis() > nextCheckpointTimeInMillis) {
1037-
this.checkpointer.checkpoint();
1038-
this.nextCheckpointTimeInMillis = System.currentTimeMillis() + checkpointsInterval;
1053+
this.checkpointer.checkpoint();
1054+
this.nextCheckpointTimeInMillis = System.currentTimeMillis() + checkpointsInterval;
10391055
}
10401056
}
10411057

@@ -1068,6 +1084,10 @@ private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(Record
10681084
.setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, record.getPartitionKey())
10691085
.setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, record.getSequenceNumber());
10701086

1087+
if (KinesisMessageDrivenChannelAdapter.this.bindSourceRecord) {
1088+
messageBuilder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
1089+
}
1090+
10711091
if (messageToUse != null) {
10721092
messageBuilder.copyHeadersIfAbsent(messageToUse.getHeaders());
10731093
}
@@ -1157,8 +1177,8 @@ public void run() {
11571177
shardConsumer.task.run();
11581178
}
11591179
catch (Exception e) {
1160-
logger.info("Got an exception " + e + " during [" + shardConsumer + "] task invocation.\n" +
1161-
"Process will be retried on the next iteration.");
1180+
logger.info("Got an exception " + e + " during [" + shardConsumer + "] task invocation"
1181+
+ ".\nProcess will be retried on the next iteration.");
11621182
}
11631183
}
11641184
}

src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.beans.factory.annotation.Autowired;
3434
import org.springframework.context.annotation.Bean;
3535
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3637
import org.springframework.integration.annotation.ServiceActivator;
3738
import org.springframework.integration.aws.KinesisLocalRunning;
3839
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter;
@@ -90,7 +91,7 @@ public static void tearDown() {
9091
}
9192

9293
@Test
93-
public void testKinesisInboundOutbound() throws InterruptedException {
94+
public void testKinesisInboundOutbound() {
9495
this.kinesisSendChannel.send(
9596
MessageBuilder.withPayload("foo")
9697
.setHeader(AwsHeaders.STREAM, TEST_STREAM)
@@ -107,12 +108,13 @@ public void testKinesisInboundOutbound() throws InterruptedException {
107108
assertThat(receive).isNotNull();
108109
assertThat(receive.getPayload()).isEqualTo(now);
109110
assertThat(receive.getHeaders()).contains(entry("foo", "BAR"));
111+
assertThat(receive.getHeaders()).containsKey(IntegrationMessageHeaderAccessor.SOURCE_DATA);
110112

111113
Message<?> errorMessage = this.errorChannel.receive(10_000);
112114
assertThat(errorMessage).isNotNull();
113115
assertThat(errorMessage.getHeaders().get(AwsHeaders.RAW_RECORD)).isNotNull();
114116
assertThat(((Exception) errorMessage.getPayload()).getMessage())
115-
.contains("Channel 'kinesisReceiveChannel' expected one of the following datataypes " +
117+
.contains("Channel 'kinesisReceiveChannel' expected one of the following data types " +
116118
"[class java.util.Date], but received [class java.lang.String]");
117119

118120

@@ -171,6 +173,7 @@ private KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter()
171173
adapter.setCheckpointStore(checkpointStore());
172174
adapter.setLockRegistry(lockRegistry());
173175
adapter.setEmbeddedHeadersMapper(new EmbeddedJsonHeadersMessageMapper("foo"));
176+
adapter.setBindSourceRecord(true);
174177

175178
DirectFieldAccessor dfa = new DirectFieldAccessor(adapter);
176179
dfa.setPropertyValue("describeStreamBackoff", 10);

0 commit comments

Comments
 (0)