Skip to content

KplMessageHandler: flushDuration is not configurable #188

@azahrebelnaLife360

Description

@azahrebelnaLife360

Hey!
I was testing my Kinesis producer implemented by Spring Cloud Stream Kinesis Binder with KPL mode turned on. And I was expecting that KPL will buffer some records under the hood and after recordMaxBufferedTime will send them to Kinesis with the same sequence number. It didn't happen. Then I found that KplMessageHandler which sends records to the KinesisProducer actually flushes each record by itself just after it's sent:

@Override
protected Future<?> handleMessageToAws(Message<?> message) {
	try {
		<kinesisProducer.addUserRecord() is invoked somewhere here>
	}
	finally {
		if (this.flushDuration.toMillis() <= 0) {
			this.kinesisProducer.flush();
		}
	}
}

I hope there are any good reasons to have flushing functionality inside the KplMessageHandler instead of relying on one implemented by KinesisProducer. But can we at least make flushDuration configurable? Because right now seems that Duration.ofMillis(0) value totally cancels any buffering. And it decreases the throughput.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions