Skip to content

Commit 800563e

Browse files
committed
Add EnableDeliveryReports to KafkaAttribute
1 parent 32b56a7 commit 800563e

File tree

4 files changed

+46
-1
lines changed

4 files changed

+46
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ The settings exposed here are targeted to more advanced users that want to custo
322322
|LibkafkaDebug|debug|Both
323323
|MetadataMaxAgeMs|metadata.max.age.ms|Both
324324
|SocketKeepaliveEnable|socket.keepalive.enable|Both
325+
|EnableDeliveryReports|Feature of Confluent.Kafka|Output
325326

326327
**NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187).
327328

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
1515
[Binding]
1616
public sealed class KafkaAttribute : Attribute
1717
{
18+
private bool? enableDeliveryReports;
19+
private int? maxMessageBytes;
20+
1821
/// <summary>
1922
/// Initialize a new instance of the <see cref="KafkaAttribute"/>
2023
/// </summary>
@@ -54,7 +57,7 @@ public KafkaAttribute()
5457
/// <summary>
5558
/// Gets or sets the Maximum transmit message size. Default: 1MB
5659
/// </summary>
57-
public int? MaxMessageBytes { get; set; }
60+
public int MaxMessageBytes { get => maxMessageBytes.GetValueOrDefault(1000000); set => maxMessageBytes = value; }
5861

5962
/// <summary>
6063
/// Maximum number of messages batched in one MessageSet. default: 10000
@@ -139,5 +142,12 @@ public KafkaAttribute()
139142
/// ssl.key.password in librdkafka
140143
/// </summary>
141144
public string SslKeyPassword { get; set; }
145+
146+
/// <summary>
147+
/// Specifies whether to enable notification of delivery reports. Typically you should
148+
/// set this parameter to true. Set it to false for "fire and forget" semantics and
149+
/// a small boost in performance. default: true importance: low
150+
/// </summary>
151+
public bool EnableDeliveryReports { get => enableDeliveryReports.GetValueOrDefault(true); set => enableDeliveryReports = value; }
142152
}
143153
}

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
118118
{
119119
BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList),
120120
BatchNumMessages = entity.Attribute.BatchSize,
121+
EnableDeliveryReports = entity.Attribute.EnableDeliveryReports,
121122
EnableIdempotence = entity.Attribute.EnableIdempotence,
123+
MessageMaxBytes = entity.Attribute.MaxMessageBytes,
122124
MessageSendMaxRetries = entity.Attribute.MaxRetries,
123125
MessageTimeoutMs = entity.Attribute.MessageTimeoutMs,
124126
RequestTimeoutMs = entity.Attribute.RequestTimeoutMs,

test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,5 +304,37 @@ public void GetProducerConfig_When_Ssl_Locations_Resolve_InAzure_Should_Contain_
304304
Assert.Equal(sslCa.FullName, config.SslCaLocation);
305305
Assert.Equal(sslKeyLocation.FullName, config.SslKeyLocation);
306306
}
307+
308+
[Fact]
309+
public void GetProducerConfig_Copies_Properties_From_Attribute()
310+
{
311+
var attribute = new KafkaAttribute("brokers:9092", "myTopic")
312+
{
313+
EnableDeliveryReports = false,
314+
BatchSize = 123,
315+
EnableIdempotence = true,
316+
MaxMessageBytes = 234,
317+
MaxRetries = 345,
318+
MessageTimeoutMs = 456,
319+
RequestTimeoutMs = 567
320+
};
321+
322+
var entity = new KafkaProducerEntity
323+
{
324+
Attribute = attribute
325+
};
326+
327+
var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance);
328+
var config = factory.GetProducerConfig(entity);
329+
330+
Assert.Equal(attribute.EnableDeliveryReports, config.EnableDeliveryReports);
331+
Assert.Equal(attribute.BatchSize, config.BatchNumMessages);
332+
Assert.Equal(attribute.EnableIdempotence, config.EnableIdempotence);
333+
Assert.Equal(attribute.MaxMessageBytes, config.MessageMaxBytes);
334+
Assert.Equal(attribute.MaxRetries, config.MessageSendMaxRetries);
335+
Assert.Equal(attribute.MessageTimeoutMs, config.MessageTimeoutMs);
336+
Assert.Equal(attribute.RequestTimeoutMs, config.RequestTimeoutMs);
337+
Assert.Equal(attribute.EnableDeliveryReports, config.EnableDeliveryReports);
338+
}
307339
}
308340
}

0 commit comments

Comments
 (0)