Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -708,14 +708,16 @@ private void populateConsumer(KinesisShardOffset shardOffset) {
this.consumerExecutor.execute(consumerInvoker);
}
else {
boolean consumerAdded = false;
for (ConsumerInvoker consumerInvoker : this.consumerInvokers) {
if (consumerInvoker.consumers.size() < this.consumerInvokerMaxCapacity) {
consumerInvoker.addConsumer(shardConsumer);
return;
consumerAdded = true;
break;
}
}

if (this.concurrency != 0) {
if (this.concurrency != 0 && !consumerAdded) {
ConsumerInvoker firstConsumerInvoker = this.consumerInvokers.get(0);
firstConsumerInvoker.addConsumer(shardConsumer);
this.consumerInvokerMaxCapacity = firstConsumerInvoker.consumers.size();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -207,11 +207,12 @@ void testResharding() throws InterruptedException {
Map shardConsumers = TestUtils.getPropertyValue(this.reshardingChannelAdapter, "shardConsumers", Map.class);

int n = 0;
while (!shardConsumers.isEmpty() && n++ < 100) {
while (shardConsumers.size() != 4 && n++ < 100) {
Thread.sleep(100);
}
assertThat(n).isLessThan(100);


// When resharding happens the describeStream() is performed again
verify(this.amazonKinesisForResharding, atLeast(1))
.listShards(any(ListShardsRequest.class));
Expand All @@ -222,7 +223,7 @@ void testResharding() throws InterruptedException {

assertThat(kinesisShardEndedEvent).isNotNull()
.extracting(KinesisShardEndedEvent::getShardKey)
.isEqualTo("SpringIntegration:streamForResharding:closedShard");
.isEqualTo("SpringIntegration:streamForResharding:closedShard4");
}

@Configuration
Expand Down Expand Up @@ -329,23 +330,79 @@ public PollableChannel kinesisChannel() {
public AmazonKinesis amazonKinesisForResharding() {
AmazonKinesis amazonKinesis = mock(AmazonKinesis.class);

// kinesis handles adding a shard by closing a shard and opening 2 new instead, creating a scenario where it
// happens couple of times
given(amazonKinesis.listShards(new ListShardsRequest().withStreamName(STREAM_FOR_RESHARDING)))
.willReturn(new ListShardsResult()
.withShards(new Shard().withShardId("closedShard").withSequenceNumberRange(
new SequenceNumberRange().withEndingSequenceNumber("1"))));
.withShards(
new Shard().withShardId("closedShard1")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1"))))
.willReturn(new ListShardsResult()
.withShards(
new Shard().withShardId("closedShard1")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1")),
new Shard().withShardId("newShard2")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("2")),
new Shard().withShardId("newShard3")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("3")),
new Shard().withShardId("closedShard4")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("4"))))
.willReturn(new ListShardsResult()
.withShards(
new Shard().withShardId("closedShard1")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1")),
new Shard().withShardId("newShard2")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("2")),
new Shard().withShardId("newShard3")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("3")),
new Shard().withShardId("closedShard4")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("4")),
new Shard().withShardId("newShard5")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("5")),
new Shard().withShardId("newShard6")
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("6"))));


setClosedShard(amazonKinesis, "1");
setNewShard(amazonKinesis, "2");
setNewShard(amazonKinesis, "3");
setClosedShard(amazonKinesis, "4");
setNewShard(amazonKinesis, "5");
setNewShard(amazonKinesis, "6");

String shard1Iterator1 = "shard1Iterator1";
return amazonKinesis;
}

private void setClosedShard(AmazonKinesis amazonKinesis, String shardIndex) {
String shardIterator = String.format("shard%sIterator1", shardIndex);

given(amazonKinesis.getShardIterator(
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "closedShard").toShardIteratorRequest()))
.willReturn(new GetShardIteratorResult().withShardIterator(shard1Iterator1));
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "closedShard" + shardIndex).toShardIteratorRequest()))
.willReturn(new GetShardIteratorResult().withShardIterator(shardIterator));

given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shard1Iterator1).withLimit(25)))
given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(25)))
.willReturn(new GetRecordsResult().withNextShardIterator(null)
.withRecords(new Record().withPartitionKey("partition1").withSequenceNumber("1")
.withRecords(new Record().withPartitionKey("partition1").withSequenceNumber(shardIndex)
.withData(ByteBuffer.wrap("foo".getBytes()))));
}

return amazonKinesis;
private void setNewShard(AmazonKinesis amazonKinesis, String shardIndex) {
String shardIterator1 = String.format("shard%sIterator1", shardIndex);
String shardIterator2 = String.format("shard%sIterator2", shardIndex);

given(amazonKinesis.getShardIterator(
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "newShard" + shardIndex).toShardIteratorRequest()))
.willReturn(new GetShardIteratorResult().withShardIterator(shardIterator1));

given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator2).withLimit(25)))
.willReturn(new GetRecordsResult().withNextShardIterator(shardIterator2)
.withRecords(new Record().withPartitionKey("partition1").withSequenceNumber(shardIndex)
.withData(ByteBuffer.wrap("foo".getBytes()))));


given(amazonKinesis.getShardIterator(
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "newShard" + shardIndex).toShardIteratorRequest()))
.willReturn(new GetShardIteratorResult().withShardIterator(shardIterator2));
}

@Bean
Expand All @@ -357,6 +414,7 @@ public KinesisMessageDrivenChannelAdapter reshardingChannelAdapter() {
adapter.setStartTimeout(10000);
adapter.setDescribeStreamRetries(1);
adapter.setRecordsLimit(25);
adapter.setConcurrency(1);

DirectFieldAccessor dfa = new DirectFieldAccessor(adapter);
dfa.setPropertyValue("describeStreamBackoff", 10);
Expand Down