diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index ea08d930..46b1a755 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -49,7 +49,7 @@ public KafkaAttribute() /// public Type KeyType { get; set; } - Type valueType; + private Type valueType; /// /// Gets or sets the Avro data type @@ -75,7 +75,7 @@ public Type ValueType /// public string AvroSchema { get; set; } - bool IsValidValueType(Type value) + private bool IsValidValueType(Type value) { return typeof(ISpecificRecord).IsAssignableFrom(value) || @@ -83,5 +83,36 @@ bool IsValidValueType(Type value) value == typeof(byte[]) || value == typeof(string); } + + /// + /// Gets or sets the Maximum transmit message size. Default: 1MB + /// + public int? MaxMessageBytes { get; set; } + + /// + /// Maximum number of messages batched in one MessageSet. default: 10000 + /// + public int? BatchSize { get; set; } + + /// + /// When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false + /// + public bool? EnableIdempotence { get; set; } + + /// + /// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000 + /// + public int? MessageTimeoutMs { get; set; } + + /// + /// The ack timeout of the producer request in milliseconds. default: 5000 + /// + public int? RequestTimeoutMs { get; set; } + + /// + /// How many times to retry sending a failing Message. **Note:** default: 2 + /// + /// Retrying may cause reordering unless EnableIdempotence is set to true. + public int? MaxRetries { get; set; } } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs index c3c7a885..9331071c 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs @@ -3,7 +3,6 @@ using System; using System.Threading; -using System.Threading.Tasks; using Confluent.Kafka; using Confluent.SchemaRegistry.Serdes; using Microsoft.Extensions.Logging; @@ -140,7 +139,7 @@ private void DeliveryHandler(DeliveryReport deliveredItem) } else { - this.logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Error: {error}", deliveredItem.Topic, (int)deliveredItem.Partition, (long)deliveredItem.Offset, deliveredItem.Error.ToString()); + logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", deliveredItem.Topic, (int)deliveredItem.Partition, (long)deliveredItem.Offset, deliveredItem.Error.Reason, deliveredItem.Error.ToString()); } } } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerProvider.cs index d9a67282..7c2529f0 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerProvider.cs @@ -22,7 +22,7 @@ public class KafkaProducerProvider : IKafkaProducerProvider private readonly IConfiguration config; private readonly INameResolver nameResolver; private readonly ILoggerProvider loggerProvider; - ConcurrentDictionary producers = new ConcurrentDictionary(); + private ConcurrentDictionary producers = new ConcurrentDictionary(); public KafkaProducerProvider(IConfiguration config, INameResolver nameResolver, ILoggerProvider loggerProvider) { @@ -33,8 +33,8 @@ public KafkaProducerProvider(IConfiguration config, INameResolver nameResolver, public IKafkaProducer Get(KafkaAttribute attribute) { - var resolvedBrokerList = this.nameResolver.ResolveWholeString(attribute.BrokerList); - var brokerListFromConfig = this.config.GetConnectionStringOrSetting(resolvedBrokerList); + var resolvedBrokerList = nameResolver.ResolveWholeString(attribute.BrokerList); + var brokerListFromConfig = config.GetConnectionStringOrSetting(resolvedBrokerList); if (!string.IsNullOrEmpty(brokerListFromConfig)) { resolvedBrokerList = brokerListFromConfig; @@ -44,7 +44,7 @@ public IKafkaProducer Get(KafkaAttribute attribute) var valueTypeName = attribute.ValueType == null ? string.Empty : attribute.ValueType.AssemblyQualifiedName; var producerKey = $"{resolvedBrokerList}:keyTypeName:valueTypeName"; - return producers.GetOrAdd(producerKey, (k) => this.Create(attribute, resolvedBrokerList)); + return producers.GetOrAdd(producerKey, (k) => Create(attribute, resolvedBrokerList)); } private IKafkaProducer Create(KafkaAttribute attribute, string brokerList) @@ -75,17 +75,19 @@ private IKafkaProducer Create(KafkaAttribute attribute, string brokerList) return (IKafkaProducer)Activator.CreateInstance( typeof(KafkaProducer<,>).MakeGenericType(keyType, valueType), - this.GetProducerConfig(brokerList), + GetProducerConfig(attribute, brokerList), avroSchema, - this.loggerProvider.CreateLogger(LogCategories.CreateTriggerCategory("Kafka"))); + loggerProvider.CreateLogger(LogCategories.CreateTriggerCategory("Kafka"))); } - private ProducerConfig GetProducerConfig(string brokerList) + private ProducerConfig GetProducerConfig(KafkaAttribute attribute, string brokerList) => new ProducerConfig { - return new ProducerConfig() - { - BootstrapServers = brokerList, - }; - } + BootstrapServers = brokerList, + BatchNumMessages = attribute.BatchSize, + EnableIdempotence = attribute.EnableIdempotence, + MessageSendMaxRetries = attribute.MaxRetries, + MessageTimeoutMs = attribute.MessageTimeoutMs, + RequestTimeoutMs = attribute.RequestTimeoutMs, + }; } } \ No newline at end of file