Skip to content

Commit dd8bbdc

Browse files
authored
Exposing AutoOffsetReset property in KafkaOptions (#364)
* Exposing AutoOffsetReset property in KafkaOptions * Adding internal autoOffsetReset field * Reverting Changes to sln file
1 parent 585e97c commit dd8bbdc

File tree

5 files changed

+72
-2
lines changed

5 files changed

+72
-2
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,11 +319,13 @@ The settings exposed here are targeted to more advanced users that want to custo
319319
|MaxPartitionFetchBytes|max.partition.fetch.bytes|Trigger
320320
|FetchMaxBytes|fetch.max.bytes|Trigger
321321
|AutoCommitIntervalMs|auto.commit.interval.ms|Trigger
322+
|AutoOffsetReset|auto.offset.reset|Trigger
322323
|LibkafkaDebug|debug|Both
323324
|MetadataMaxAgeMs|metadata.max.age.ms|Both
324325
|SocketKeepaliveEnable|socket.keepalive.enable|Both
325326

326327
**NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187).
328+
**NOTE:** `AutoOffsetReset` default is Earliest. Allowed Values are `Earliest` and `Latest`.
327329

328330
If you are missing an configuration setting please create an issue and describe why you need it.
329331

docs/public-documentation/Trigger.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,7 @@ The settings exposed here are to customize how librdkafka works. [Librdkafka Doc
691691
|MaxPartitionFetchBytes|max.partition.fetch.bytes|
692692
|FetchMaxBytes|fetch.max.bytes|
693693
|AutoCommitIntervalMs|auto.commit.interval.ms|
694+
|AutoOffsetReset|auto.offset.reset|
694695
|LibkafkaDebug|debug|
695696
|MetadataMaxAgeMs|metadata.max.age.ms|
696697
|SocketKeepaliveEnable|socket.keepalive.enable|

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaOptions.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,19 @@ public int MaxBatchSize
136136
/// <value>The auto commit interval ms.</value>
137137
public int AutoCommitIntervalMs { get; set; } = 200;
138138

139+
AutoOffsetReset autoOffsetReset = AutoOffsetReset.Earliest;
140+
141+
/// <summary>
142+
/// Ges or sets the AutoOffsetReset option for librdkafka library.
143+
/// default: Earliest
144+
/// Allowed Values: Latest, Earliest
145+
/// Librdkafka: auto.commit.offset
146+
/// </summary>
147+
public AutoOffsetReset AutoOffsetReset {
148+
get => this.autoOffsetReset;
149+
set => this.autoOffsetReset = AutoOffsetReset.Latest.Equals(value) ? AutoOffsetReset.Latest : AutoOffsetReset.Earliest;
150+
}
151+
139152
/// <summary>
140153
/// Gets or sets the debug option for librdkafka library.
141154
/// Default = "" (disable)

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private ConsumerConfig GetConsumerConfiguration()
164164
Debug = this.options.LibkafkaDebug,
165165

166166
// start from earliest if no checkpoint has been committed
167-
AutoOffsetReset = AutoOffsetReset.Earliest,
167+
AutoOffsetReset = this.options.AutoOffsetReset,
168168

169169
// Secure communication/authentication
170170
SaslMechanism = this.listenerConfiguration.SaslMechanism,

test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaListenerTest.cs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,11 +350,65 @@ public async Task When_Options_Are_Set_Should_Be_Set_In_Consumer_Config()
350350
Assert.Equal("path/to/key", target.ConsumerConfig.SslKeyLocation);
351351
Assert.Equal("path/to/cacert", target.ConsumerConfig.SslCaLocation);
352352
Assert.Equal(kafkaOptions.AutoCommitIntervalMs, target.ConsumerConfig.AutoCommitIntervalMs);
353+
Assert.Equal(AutoOffsetReset.Earliest, target.ConsumerConfig.AutoOffsetReset);
354+
Assert.Equal(true, target.ConsumerConfig.EnableAutoCommit);
355+
Assert.Equal(false, target.ConsumerConfig.EnableAutoOffsetStore);
356+
Assert.Equal(180000, target.ConsumerConfig.MetadataMaxAgeMs);
357+
Assert.Equal(true, target.ConsumerConfig.SocketKeepaliveEnable);
358+
359+
await target.StopAsync(default);
360+
}
361+
362+
[Fact]
363+
public async Task When_Options_With_AutoOffsetReset_Latest_Are_Set_Should_Be_Set_In_Consumer_Config()
364+
{
365+
var executor = new Mock<ITriggeredFunctionExecutor>();
366+
var consumer = new Mock<IConsumer<Ignore, string>>();
367+
368+
var listenerConfig = new KafkaListenerConfiguration()
369+
{
370+
BrokerList = "testBroker",
371+
Topic = "topic",
372+
ConsumerGroup = "group1",
373+
SslKeyPassword = "password1",
374+
SslCertificateLocation = "path/to/cert",
375+
SslKeyLocation = "path/to/key",
376+
SslCaLocation = "path/to/cacert"
377+
};
378+
379+
var kafkaOptions = new KafkaOptions()
380+
{
381+
AutoOffsetReset = AutoOffsetReset.Latest
382+
};
383+
384+
var target = new KafkaListenerForTest<Ignore, string>(
385+
executor.Object,
386+
true,
387+
kafkaOptions,
388+
listenerConfig,
389+
requiresKey: true,
390+
valueDeserializer: null,
391+
NullLogger.Instance,
392+
functionId: "testId"
393+
);
394+
395+
target.SetConsumer(consumer.Object);
396+
397+
await target.StartAsync(default);
398+
399+
Assert.Equal(12, target.ConsumerConfig.Count());
400+
Assert.Equal("testBroker", target.ConsumerConfig.BootstrapServers);
401+
Assert.Equal("group1", target.ConsumerConfig.GroupId);
402+
Assert.Equal("password1", target.ConsumerConfig.SslKeyPassword);
403+
Assert.Equal("path/to/cert", target.ConsumerConfig.SslCertificateLocation);
404+
Assert.Equal("path/to/key", target.ConsumerConfig.SslKeyLocation);
405+
Assert.Equal("path/to/cacert", target.ConsumerConfig.SslCaLocation);
406+
Assert.Equal(kafkaOptions.AutoCommitIntervalMs, target.ConsumerConfig.AutoCommitIntervalMs);
407+
Assert.Equal(AutoOffsetReset.Latest, target.ConsumerConfig.AutoOffsetReset);
353408
Assert.Equal(true, target.ConsumerConfig.EnableAutoCommit);
354409
Assert.Equal(false, target.ConsumerConfig.EnableAutoOffsetStore);
355410
Assert.Equal(180000, target.ConsumerConfig.MetadataMaxAgeMs);
356411
Assert.Equal(true, target.ConsumerConfig.SocketKeepaliveEnable);
357-
Assert.Equal(AutoOffsetReset.Earliest, target.ConsumerConfig.AutoOffsetReset);
358412

359413
await target.StopAsync(default);
360414
}

0 commit comments

Comments
 (0)