Skip to content

KinesisMessageDrivenChannelAdapter repeating TimeoutException on 2nd instance when shard is closed and exhauted #210

@tonketonky

Description

@tonketonky

When 2 application instances that consume DynamoDB stream using spring-cloud-stream-binder-aws-kinesis are running at once there can be seen following behavior:

  1. The first instance starts 4 shard consumers:
2022-10-12 09:46:26.433  INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665553752342-80c2e5dd', reset=false}, state=NEW}] has been started.
2022-10-12 09:46:26.567  INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 09:46:26.687  INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665555448236-62c10f13', reset=false}, state=NEW}] has been started.
2022-10-12 09:46:26.808  INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665559594987-5eb20a1f', reset=false}, state=NEW}] has been started.
  1. The second instance has no shard consumers started
  2. After some time one shard gets closed and a new shard gets open
  3. The first instance closes shard consumer for old shard and starts a shard consumer for new shard:
2022-10-12 10:29:12.316  INFO 1525459 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : Stopping the [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=STOP}] on the checkpoint [93694400000000019830554236] because the shard has been CLOSED and exhausted.
2022-10-12 10:29:15.197  INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665563350807-0315bc8f', reset=false}, state=NEW}] has been started.
  1. So far so good. However, the second instance starts a shard consumer for the closed shard, complains about lock not being renewed in time and throws TimeoutException
2022-10-12 10:29:18.288  INFO 1525759 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 10:29:29.249  INFO 1525759 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'persist_to_jobs_history_dynamodb:arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355:shardId-00000001665547915463-c0fb26f8' was not renewed in time

java.util.concurrent.TimeoutException: null
	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) ~[na:1.8.0_342]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[na:1.8.0_342]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) [spring-integration-aws-2.5.1.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_342]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_342]
	at java.lang.Thread.run(Thread.java:750) [na:1.8.0_342]

2022-10-12 10:30:19.322  INFO 1525759 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 10:30:30.261  INFO 1525759 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'persist_to_jobs_history_dynamodb:arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355:shardId-00000001665547915463-c0fb26f8' was not renewed in time

java.util.concurrent.TimeoutException: null
	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) ~[na:1.8.0_342]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[na:1.8.0_342]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) [spring-integration-aws-2.5.1.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_342]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_342]
	at java.lang.Thread.run(Thread.java:750) [na:1.8.0_342]

2022-10-12 10:31:20.327  INFO 1525759 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 10:31:31.274  INFO 1525759 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'persist_to_jobs_history_dynamodb:arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355:shardId-00000001665547915463-c0fb26f8' was not renewed in time

java.util.concurrent.TimeoutException: null
	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) ~[na:1.8.0_342]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[na:1.8.0_342]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) [spring-integration-aws-2.5.1.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_342]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_342]
	at java.lang.Thread.run(Thread.java:750) [na:1.8.0_342]
  1. As a consequence there is an extra record in SpringIntegrationLockRegistry DynamoDB table for the closed shard with the second instance as an owner. This record is being read/updated by the second instance repeatedly which increases read/write usage. The number of these extra records and thus read/write usage increases over the time.

Applications use spring-cloud-stream version 3.2.4 and spring-cloud-stream-binder-kinesis version 2.2.0.

Is there anything that can be done to prevent this behavior?
Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions