diff --git a/README.md b/README.md index 013b4125..5e8a1f9f 100644 --- a/README.md +++ b/README.md @@ -323,6 +323,7 @@ The settings exposed here are targeted to more advanced users that want to custo |LibkafkaDebug|debug|Both |MetadataMaxAgeMs|metadata.max.age.ms|Both |SocketKeepaliveEnable|socket.keepalive.enable|Both +|EnableDeliveryReports|Feature of Confluent.Kafka|Output **NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187). **NOTE:** `AutoOffsetReset` default is Earliest. Allowed Values are `Earliest` and `Latest`. diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 74cc80b0..6dda9d57 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -139,5 +139,12 @@ public KafkaAttribute() /// ssl.key.password in librdkafka /// public string SslKeyPassword { get; set; } + + /// + /// Specifies whether to enable notification of delivery reports. Typically you should + /// set this parameter to true. Set it to false for "fire and forget" semantics and + /// a small boost in performance. default: true importance: low + /// + public bool EnableDeliveryReports { get; set; } = true; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs index a65ffa0e..45f93f4b 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs @@ -118,7 +118,9 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity) { BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList), BatchNumMessages = entity.Attribute.BatchSize, + EnableDeliveryReports = entity.Attribute.EnableDeliveryReports, EnableIdempotence = entity.Attribute.EnableIdempotence, + MessageMaxBytes = entity.Attribute.MaxMessageBytes, MessageSendMaxRetries = entity.Attribute.MaxRetries, MessageTimeoutMs = entity.Attribute.MessageTimeoutMs, RequestTimeoutMs = entity.Attribute.RequestTimeoutMs, diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs index af3c7bc8..067c8021 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs @@ -304,5 +304,37 @@ public void GetProducerConfig_When_Ssl_Locations_Resolve_InAzure_Should_Contain_ Assert.Equal(sslCa.FullName, config.SslCaLocation); Assert.Equal(sslKeyLocation.FullName, config.SslKeyLocation); } + + [Fact] + public void GetProducerConfig_Copies_Properties_From_Attribute() + { + var attribute = new KafkaAttribute("brokers:9092", "myTopic") + { + EnableDeliveryReports = false, + BatchSize = 123, + EnableIdempotence = true, + MaxMessageBytes = 234, + MaxRetries = 345, + MessageTimeoutMs = 456, + RequestTimeoutMs = 567 + }; + + var entity = new KafkaProducerEntity + { + Attribute = attribute + }; + + var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance); + var config = factory.GetProducerConfig(entity); + + Assert.Equal(attribute.EnableDeliveryReports, config.EnableDeliveryReports); + Assert.Equal(attribute.BatchSize, config.BatchNumMessages); + Assert.Equal(attribute.EnableIdempotence, config.EnableIdempotence); + Assert.Equal(attribute.MaxMessageBytes, config.MessageMaxBytes); + Assert.Equal(attribute.MaxRetries, config.MessageSendMaxRetries); + Assert.Equal(attribute.MessageTimeoutMs, config.MessageTimeoutMs); + Assert.Equal(attribute.RequestTimeoutMs, config.RequestTimeoutMs); + Assert.Equal(attribute.EnableDeliveryReports, config.EnableDeliveryReports); + } } } \ No newline at end of file