From f7bf80c12694abd92848fd14ed25daf0848a2934 Mon Sep 17 00:00:00 2001 From: Mirza Ucanbarlic <56406159+supersonicbyte@users.noreply.github.com> Date: Sun, 29 Jun 2025 17:02:59 +0200 Subject: [PATCH 1/2] Add support for `fetch.wait.max.ms` Kafka consumer config Motivation: Currently the API doesn't provide a way to set the `fetch.wait.max.ms` Kafka consumer config. This PR enables the user to set it. Modifications: Added support for `fetch.wait.max.ms` inside of `KafkaConsumerConfiguration`. Result: We can set `fetch.wait.max.msa`. --- .../KafkaConsumerConfiguration.swift | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index df0404ca..465bb5e9 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -188,6 +188,20 @@ public struct KafkaConsumerConfiguration { } } + /// The maximum amount of time the server will block before answering the fetch request + /// there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes. + /// Default: `.milliseconds(500)` + public var maximumFetchWaitTime: Duration = .milliseconds(500) { + didSet { + if maximumFetchWaitTime != .zero { + precondition( + maximumFetchWaitTime.canBeRepresentedAsMilliseconds, + "Lowest granularity is milliseconds" + ) + } + } + } + /// Topic metadata options. public var topicMetadata: KafkaConfiguration.TopicMetadataOptions = .init() @@ -260,6 +274,7 @@ extension KafkaConsumerConfiguration { resultDict["receive.message.max.bytes"] = String(maximumReceiveMessageBytes) resultDict["max.in.flight.requests.per.connection"] = String(maximumInFlightRequestsPerConnection) resultDict["metadata.max.age.ms"] = String(maximumMetadataAge.inMilliseconds) + resultDict["fetch.wait.max.ms"] = String(maximumFetchWaitTime.inMilliseconds) resultDict["topic.metadata.refresh.interval.ms"] = String(topicMetadata.refreshInterval.rawValue) resultDict["topic.metadata.refresh.fast.interval.ms"] = String(topicMetadata.refreshFastInterval.inMilliseconds) resultDict["topic.metadata.refresh.sparse"] = String(topicMetadata.isSparseRefreshingEnabled) From e427727f5bff6ee745c7d8af96286f406db54d64 Mon Sep 17 00:00:00 2001 From: Mirza Ucanbarlic <56406159+supersonicbyte@users.noreply.github.com> Date: Mon, 30 Jun 2025 15:55:06 +0200 Subject: [PATCH 2/2] run swift format --- Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index 465bb5e9..094db301 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -191,7 +191,7 @@ public struct KafkaConsumerConfiguration { /// The maximum amount of time the server will block before answering the fetch request /// there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes. /// Default: `.milliseconds(500)` - public var maximumFetchWaitTime: Duration = .milliseconds(500) { + public var maximumFetchWaitTime: Duration = .milliseconds(500) { didSet { if maximumFetchWaitTime != .zero { precondition(