From b2e4da2aea9cfc230c5ec200de30efaad5805634 Mon Sep 17 00:00:00 2001 From: Anton Malinovskiy Date: Mon, 10 Aug 2020 18:58:03 +0200 Subject: [PATCH 1/2] fix: a combination of closing / opening shards can lead to consumer not starting Fixes https://github.com/spring-projects/spring-integration-aws/issues/167 --- .../KinesisMessageDrivenChannelAdapter.java | 6 +- ...nesisMessageDrivenChannelAdapterTests.java | 81 ++++++++++++++++--- 2 files changed, 74 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java index e17cc650..59294142 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java @@ -708,14 +708,16 @@ private void populateConsumer(KinesisShardOffset shardOffset) { this.consumerExecutor.execute(consumerInvoker); } else { + boolean consumerAdded = false; for (ConsumerInvoker consumerInvoker : this.consumerInvokers) { if (consumerInvoker.consumers.size() < this.consumerInvokerMaxCapacity) { consumerInvoker.addConsumer(shardConsumer); - return; + consumerAdded = true; + break; } } - if (this.concurrency != 0) { + if (this.concurrency != 0 && !consumerAdded) { ConsumerInvoker firstConsumerInvoker = this.consumerInvokers.get(0); firstConsumerInvoker.addConsumer(shardConsumer); this.consumerInvokerMaxCapacity = firstConsumerInvoker.consumers.size(); diff --git a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java index c7da48a6..f89b92e7 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.atLeast; @@ -207,11 +208,12 @@ void testResharding() throws InterruptedException { Map shardConsumers = TestUtils.getPropertyValue(this.reshardingChannelAdapter, "shardConsumers", Map.class); int n = 0; - while (!shardConsumers.isEmpty() && n++ < 100) { + while (shardConsumers.size() != 4 && n++ < 100) { Thread.sleep(100); } assertThat(n).isLessThan(100); + // When resharding happens the describeStream() is performed again verify(this.amazonKinesisForResharding, atLeast(1)) .listShards(any(ListShardsRequest.class)); @@ -222,7 +224,7 @@ void testResharding() throws InterruptedException { assertThat(kinesisShardEndedEvent).isNotNull() .extracting(KinesisShardEndedEvent::getShardKey) - .isEqualTo("SpringIntegration:streamForResharding:closedShard"); + .isEqualTo("SpringIntegration:streamForResharding:closedShard4"); } @Configuration @@ -329,23 +331,79 @@ public PollableChannel kinesisChannel() { public AmazonKinesis amazonKinesisForResharding() { AmazonKinesis amazonKinesis = mock(AmazonKinesis.class); + // kinesis handles adding a shard by closing a shard and opening 2 new instead, creating a scenario where it + // happens couple of times given(amazonKinesis.listShards(new ListShardsRequest().withStreamName(STREAM_FOR_RESHARDING))) .willReturn(new ListShardsResult() - .withShards(new Shard().withShardId("closedShard").withSequenceNumberRange( - new SequenceNumberRange().withEndingSequenceNumber("1")))); + .withShards( + new Shard().withShardId("closedShard1") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1")))) + .willReturn(new ListShardsResult() + .withShards( + new Shard().withShardId("closedShard1") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1")), + new Shard().withShardId("newShard2") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("2")), + new Shard().withShardId("newShard3") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("3")), + new Shard().withShardId("closedShard4") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("4")))) + .willReturn(new ListShardsResult() + .withShards( + new Shard().withShardId("closedShard1") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1")), + new Shard().withShardId("newShard2") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("2")), + new Shard().withShardId("newShard3") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("3")), + new Shard().withShardId("closedShard4") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("4")), + new Shard().withShardId("newShard5") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("5")), + new Shard().withShardId("newShard6") + .withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("6")))); + + + setClosedShard(amazonKinesis, "1"); + setNewShard(amazonKinesis, "2"); + setNewShard(amazonKinesis, "3"); + setClosedShard(amazonKinesis, "4"); + setNewShard(amazonKinesis, "5"); + setNewShard(amazonKinesis, "6"); - String shard1Iterator1 = "shard1Iterator1"; + return amazonKinesis; + } + + private void setClosedShard(AmazonKinesis amazonKinesis, String shardIndex) { + String shardIterator = String.format("shard%sIterator1", shardIndex); given(amazonKinesis.getShardIterator( - KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "closedShard").toShardIteratorRequest())) - .willReturn(new GetShardIteratorResult().withShardIterator(shard1Iterator1)); + KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "closedShard" + shardIndex).toShardIteratorRequest())) + .willReturn(new GetShardIteratorResult().withShardIterator(shardIterator)); - given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shard1Iterator1).withLimit(25))) + given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(25))) .willReturn(new GetRecordsResult().withNextShardIterator(null) - .withRecords(new Record().withPartitionKey("partition1").withSequenceNumber("1") + .withRecords(new Record().withPartitionKey("partition1").withSequenceNumber(shardIndex) .withData(ByteBuffer.wrap("foo".getBytes())))); + } - return amazonKinesis; + private void setNewShard(AmazonKinesis amazonKinesis, String shardIndex) { + String shardIterator1 = String.format("shard%sIterator1", shardIndex); + String shardIterator2 = String.format("shard%sIterator2", shardIndex); + + given(amazonKinesis.getShardIterator( + KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "newShard" + shardIndex).toShardIteratorRequest())) + .willReturn(new GetShardIteratorResult().withShardIterator(shardIterator1)); + + given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator2).withLimit(25))) + .willReturn(new GetRecordsResult().withNextShardIterator(shardIterator2) + .withRecords(new Record().withPartitionKey("partition1").withSequenceNumber(shardIndex) + .withData(ByteBuffer.wrap("foo".getBytes())))); + + + given(amazonKinesis.getShardIterator( + KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "newShard" + shardIndex).toShardIteratorRequest())) + .willReturn(new GetShardIteratorResult().withShardIterator(shardIterator2)); } @Bean @@ -357,6 +415,7 @@ public KinesisMessageDrivenChannelAdapter reshardingChannelAdapter() { adapter.setStartTimeout(10000); adapter.setDescribeStreamRetries(1); adapter.setRecordsLimit(25); + adapter.setConcurrency(1); DirectFieldAccessor dfa = new DirectFieldAccessor(adapter); dfa.setPropertyValue("describeStreamBackoff", 10); From 449b82048d00a0a9c0fd35d7914074ff7196661f Mon Sep 17 00:00:00 2001 From: Anton Malinovskiy Date: Mon, 10 Aug 2020 19:06:32 +0200 Subject: [PATCH 2/2] chore: removed unused import --- .../aws/inbound/KinesisMessageDrivenChannelAdapterTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java index f89b92e7..7b4a1b3d 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java @@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.atLeast;