Skip to content

Add consumer flow control API #333

@henry701

Description

@henry701

Is your feature request related to a problem? Please describe.

In high-volume scenarios, consuming from the start of a very big Stream (many chunks), consumers that consume messages asynchronously and take very long to consume messages will keep receiving chunks until the application runs out of memory.

Describe the solution you'd like

Expose chunk-level control flow mechanism at the Consumer API level.

This could be implemented by exposing a ChunkListener on the Consumer builder. This listener callback would receive a Context object with information about the received chunk, much like the existing API design today.
The crucial part is that the Context would also have a callback so that the listener could control when to request additional chunks (by internally sending Credit messages using the Client).

Example (pseudocode from the top of my head, will elaborate and fix minor errors later, if applicable):

ConcurrentLinkedQueue<Consumer.ChunkListener.ChunkContext> unhandledChunks = new ConcurrentLinkedQueue<>();
AtomicLong pendingMessages = new AtomicLong(0);
Consumer = environment.consumerBuilder()
  .stream("sampleStream")
  .name("application-1")
  .manualTrackingStrategy()   
  .builder()
  .messageHandler((context, message) -> {
    /* handle message asynchronously here... */
    // Assume all remaining code inside this MessageHandler is run ASYNCHRONOUSLY after the message is processed by an external Executor or Thread or whatever
    long localPendingMessages = pendingMessages.decrementAndGet();
    if (conditionToStore()) {
        context.storeOffset();   
    }
    // We're out of messages, mark the chunk as handled so that Client sends a Credit request and more chunks come in later
    if (localPendingMessages == 0) {
        unhandledChunks.poll().markHandled();
    }
  })
  .chunkListener(chunkContext -> {
    unhandledChunks.offer(chunkContext);
    pendingMessages.incrementAndGet(chunkContext.chunkSize());
    // chunkContext.markHandled();
    // Alternatively, expose the quantity of messages to ask for, via .credits(int credits)? Or have both APIs?
  });

Of course, this is just a naive implementation draft. Better flow control would have a buffer to ensure high throughput, instead of asking for credits only when the consumer is already idle.

The default chunkListener would correspond to just asking for additionalCredits, this way current behavior is preserved in a backward-compatible way.

As a bonus, this explicit flow control would make integration with RxJava possible as well, which could be very useful for integrating this library together with Spring, for instance.

Describe alternatives you've considered

An alternative would be to consume messages synchronously from the connection thread, but this blocks some response handling and causes some RPC calls to time out, so it's not ideal.

Additional context

There are similar discussions on issue #262 and https://stackoverflow.com/questions/71932072/add-delay-in-consuming-messages-in-rabbitmq-stream

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions