Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ The settings exposed here are targeted to more advanced users that want to custo
|LibkafkaDebug|debug|Both
|MetadataMaxAgeMs|metadata.max.age.ms|Both
|SocketKeepaliveEnable|socket.keepalive.enable|Both
|EnableDeliveryReports|Feature of Confluent.Kafka|Output

**NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187).
**NOTE:** `AutoOffsetReset` default is Earliest. Allowed Values are `Earliest` and `Latest`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,12 @@ public KafkaAttribute()
/// ssl.key.password in librdkafka
/// </summary>
public string SslKeyPassword { get; set; }

/// <summary>
/// Specifies whether to enable notification of delivery reports. Typically you should
/// set this parameter to true. Set it to false for "fire and forget" semantics and
/// a small boost in performance. default: true importance: low
/// </summary>
public bool EnableDeliveryReports { get; set; } = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
{
BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList),
BatchNumMessages = entity.Attribute.BatchSize,
EnableDeliveryReports = entity.Attribute.EnableDeliveryReports,
EnableIdempotence = entity.Attribute.EnableIdempotence,
MessageMaxBytes = entity.Attribute.MaxMessageBytes,
MessageSendMaxRetries = entity.Attribute.MaxRetries,
MessageTimeoutMs = entity.Attribute.MessageTimeoutMs,
RequestTimeoutMs = entity.Attribute.RequestTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,5 +304,37 @@ public void GetProducerConfig_When_Ssl_Locations_Resolve_InAzure_Should_Contain_
Assert.Equal(sslCa.FullName, config.SslCaLocation);
Assert.Equal(sslKeyLocation.FullName, config.SslKeyLocation);
}

[Fact]
public void GetProducerConfig_Copies_Properties_From_Attribute()
{
var attribute = new KafkaAttribute("brokers:9092", "myTopic")
{
EnableDeliveryReports = false,
BatchSize = 123,
EnableIdempotence = true,
MaxMessageBytes = 234,
MaxRetries = 345,
MessageTimeoutMs = 456,
RequestTimeoutMs = 567
};

var entity = new KafkaProducerEntity
{
Attribute = attribute
};

var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance);
var config = factory.GetProducerConfig(entity);

Assert.Equal(attribute.EnableDeliveryReports, config.EnableDeliveryReports);
Assert.Equal(attribute.BatchSize, config.BatchNumMessages);
Assert.Equal(attribute.EnableIdempotence, config.EnableIdempotence);
Assert.Equal(attribute.MaxMessageBytes, config.MessageMaxBytes);
Assert.Equal(attribute.MaxRetries, config.MessageSendMaxRetries);
Assert.Equal(attribute.MessageTimeoutMs, config.MessageTimeoutMs);
Assert.Equal(attribute.RequestTimeoutMs, config.RequestTimeoutMs);
Assert.Equal(attribute.EnableDeliveryReports, config.EnableDeliveryReports);
}
}
}