Skip to content

Consumers for new shards are not launched if shard count > concurrency in KinesisMessageDrivenChannelAdapter #167

@amalinovskiy

Description

@amalinovskiy

Currently the code in org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter#populateConsumer does

  for (ConsumerInvoker consumerInvoker : this.consumerInvokers) {
    if (consumerInvoker.consumers.size() < this.consumerInvokerMaxCapacity) {
      consumerInvoker.addConsumer(shardConsumer);
      return;
    }
  }

which prevents this.shardConsumers.put(shardOffset, shardConsumer); execution in the end of the method, which in turn seems lead to shard never processed org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.ConsumerDispatcher#run.

using a "consumerAdded" flag + break should solve the issue, i think.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions