From aa4823469524af711479c662b54e4e4662ed7fcb Mon Sep 17 00:00:00 2001 From: Brandon H Date: Thu, 4 Apr 2019 10:20:36 -0700 Subject: [PATCH 1/4] Added attribute properties + docs, passed in to ProducerConfig generator --- .../Output/KafkaAttribute.cs | 43 ++++++++++++++++--- .../Output/KafkaProducerProvider.cs | 26 +++++------ 2 files changed, 51 insertions(+), 18 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index ea08d930..570b2713 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -20,7 +20,7 @@ public sealed class KafkaAttribute : Attribute /// Topic name public KafkaAttribute(string topic) { - Topic = topic; + this.Topic = topic; } /// @@ -49,7 +49,7 @@ public KafkaAttribute() /// public Type KeyType { get; set; } - Type valueType; + private Type valueType; /// /// Gets or sets the Avro data type @@ -57,15 +57,15 @@ public KafkaAttribute() /// public Type ValueType { - get => this.valueType; + get => valueType; set { if (value != null && !IsValidValueType(value)) { - throw new ArgumentException($"The value of {nameof(ValueType)} must be a byte[], string or a type that implements {nameof(ISpecificRecord)} or {nameof(Google.Protobuf.IMessage)}. The type {value.Name} does not."); + throw new ArgumentException($"The value of {nameof(this.ValueType)} must be a byte[], string or a type that implements {nameof(ISpecificRecord)} or {nameof(Google.Protobuf.IMessage)}. The type {value.Name} does not."); } - this.valueType = value; + valueType = value; } } @@ -75,7 +75,7 @@ public Type ValueType /// public string AvroSchema { get; set; } - bool IsValidValueType(Type value) + private bool IsValidValueType(Type value) { return typeof(ISpecificRecord).IsAssignableFrom(value) || @@ -83,5 +83,36 @@ bool IsValidValueType(Type value) value == typeof(byte[]) || value == typeof(string); } + + /// + /// Gets or sets the Maximum transmit message size. Default: 1MB + /// + public int MaxMessageBytes { get; set; } = 1024000; + + /// + /// Maximum number of messages batched in one MessageSet. default: 10000 + /// + public int BatchSize { get; set; } = 1000; + + /// + /// When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false + /// + public bool EnableIdempotence { get; set; } = false; + + /// + /// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000 + /// + public int MessageTimeoutMs { get; set; } = 300000; + + /// + /// The ack timeout of the producer request in milliseconds. default: 5000 + /// + public int RequestTimeoutMs { get; set; } = 5000; + + /// + /// How many times to retry sending a failing Message. **Note:** default: 2 + /// + /// Retrying may cause reordering unless EnableIdempotence is set to true. + public int MaxRetries { get; set; } = 2; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerProvider.cs index d9a67282..7c2529f0 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerProvider.cs @@ -22,7 +22,7 @@ public class KafkaProducerProvider : IKafkaProducerProvider private readonly IConfiguration config; private readonly INameResolver nameResolver; private readonly ILoggerProvider loggerProvider; - ConcurrentDictionary producers = new ConcurrentDictionary(); + private ConcurrentDictionary producers = new ConcurrentDictionary(); public KafkaProducerProvider(IConfiguration config, INameResolver nameResolver, ILoggerProvider loggerProvider) { @@ -33,8 +33,8 @@ public KafkaProducerProvider(IConfiguration config, INameResolver nameResolver, public IKafkaProducer Get(KafkaAttribute attribute) { - var resolvedBrokerList = this.nameResolver.ResolveWholeString(attribute.BrokerList); - var brokerListFromConfig = this.config.GetConnectionStringOrSetting(resolvedBrokerList); + var resolvedBrokerList = nameResolver.ResolveWholeString(attribute.BrokerList); + var brokerListFromConfig = config.GetConnectionStringOrSetting(resolvedBrokerList); if (!string.IsNullOrEmpty(brokerListFromConfig)) { resolvedBrokerList = brokerListFromConfig; @@ -44,7 +44,7 @@ public IKafkaProducer Get(KafkaAttribute attribute) var valueTypeName = attribute.ValueType == null ? string.Empty : attribute.ValueType.AssemblyQualifiedName; var producerKey = $"{resolvedBrokerList}:keyTypeName:valueTypeName"; - return producers.GetOrAdd(producerKey, (k) => this.Create(attribute, resolvedBrokerList)); + return producers.GetOrAdd(producerKey, (k) => Create(attribute, resolvedBrokerList)); } private IKafkaProducer Create(KafkaAttribute attribute, string brokerList) @@ -75,17 +75,19 @@ private IKafkaProducer Create(KafkaAttribute attribute, string brokerList) return (IKafkaProducer)Activator.CreateInstance( typeof(KafkaProducer<,>).MakeGenericType(keyType, valueType), - this.GetProducerConfig(brokerList), + GetProducerConfig(attribute, brokerList), avroSchema, - this.loggerProvider.CreateLogger(LogCategories.CreateTriggerCategory("Kafka"))); + loggerProvider.CreateLogger(LogCategories.CreateTriggerCategory("Kafka"))); } - private ProducerConfig GetProducerConfig(string brokerList) + private ProducerConfig GetProducerConfig(KafkaAttribute attribute, string brokerList) => new ProducerConfig { - return new ProducerConfig() - { - BootstrapServers = brokerList, - }; - } + BootstrapServers = brokerList, + BatchNumMessages = attribute.BatchSize, + EnableIdempotence = attribute.EnableIdempotence, + MessageSendMaxRetries = attribute.MaxRetries, + MessageTimeoutMs = attribute.MessageTimeoutMs, + RequestTimeoutMs = attribute.RequestTimeoutMs, + }; } } \ No newline at end of file From d00b8520b3b8ff0b4c76a30a6dba5f5ac04209be Mon Sep 17 00:00:00 2001 From: Brandon H Date: Thu, 4 Apr 2019 10:29:51 -0700 Subject: [PATCH 2/4] adding error reason output --- .../Output/KafkaProducer.cs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs index c3c7a885..d3fa88e7 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs @@ -3,7 +3,6 @@ using System; using System.Threading; -using System.Threading.Tasks; using Confluent.Kafka; using Confluent.SchemaRegistry.Serdes; using Microsoft.Extensions.Logging; @@ -58,14 +57,14 @@ public KafkaProducer( builder.SetKeySerializer(keySerializer); } - this.producer = builder.Build(); + producer = builder.Build(); } public void Flush(CancellationToken cancellationToken) { try { - this.producer.Flush(cancellationToken); + producer.Flush(cancellationToken); } catch (OperationCanceledException) { @@ -73,7 +72,7 @@ public void Flush(CancellationToken cancellationToken) } catch (Exception ex) { - this.logger.LogError(ex, "Error flushing Kafka producer"); + logger.LogError(ex, "Error flushing Kafka producer"); throw; } } @@ -123,11 +122,11 @@ public void Produce(string topic, KafkaEventData item) try { - this.producer.BeginProduce(topicUsed, msg, this.DeliveryHandler); + producer.BeginProduce(topicUsed, msg, DeliveryHandler); } catch (Exception ex) { - this.logger.LogError(ex, "Error producing into {topic}", topicUsed); + logger.LogError(ex, "Error producing into {topic}", topicUsed); throw; } } @@ -136,11 +135,11 @@ private void DeliveryHandler(DeliveryReport deliveredItem) { if (deliveredItem.Error == null || deliveredItem.Error.Code == ErrorCode.NoError) { - this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveredItem.Topic, (int)deliveredItem.Partition, (long)deliveredItem.Offset); + logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveredItem.Topic, (int)deliveredItem.Partition, (long)deliveredItem.Offset); } else { - this.logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Error: {error}", deliveredItem.Topic, (int)deliveredItem.Partition, (long)deliveredItem.Offset, deliveredItem.Error.ToString()); + logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", deliveredItem.Topic, (int)deliveredItem.Partition, (long)deliveredItem.Offset, deliveredItem.Error.Reason, deliveredItem.Error.ToString()); } } } From 1e1cb2702e2853eb6a0551902402593b465c02bb Mon Sep 17 00:00:00 2001 From: Brandon H Date: Thu, 4 Apr 2019 11:40:29 -0700 Subject: [PATCH 3/4] styling auto-fixes undo --- .../Output/KafkaAttribute.cs | 8 ++++---- .../Output/KafkaProducer.cs | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 570b2713..ff306292 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -20,7 +20,7 @@ public sealed class KafkaAttribute : Attribute /// Topic name public KafkaAttribute(string topic) { - this.Topic = topic; + Topic = topic; } /// @@ -57,15 +57,15 @@ public KafkaAttribute() /// public Type ValueType { - get => valueType; + get => this.valueType; set { if (value != null && !IsValidValueType(value)) { - throw new ArgumentException($"The value of {nameof(this.ValueType)} must be a byte[], string or a type that implements {nameof(ISpecificRecord)} or {nameof(Google.Protobuf.IMessage)}. The type {value.Name} does not."); + throw new ArgumentException($"The value of {nameof(ValueType)} must be a byte[], string or a type that implements {nameof(ISpecificRecord)} or {nameof(Google.Protobuf.IMessage)}. The type {value.Name} does not."); } - valueType = value; + this.valueType = value; } } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs index d3fa88e7..9331071c 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs @@ -57,14 +57,14 @@ public KafkaProducer( builder.SetKeySerializer(keySerializer); } - producer = builder.Build(); + this.producer = builder.Build(); } public void Flush(CancellationToken cancellationToken) { try { - producer.Flush(cancellationToken); + this.producer.Flush(cancellationToken); } catch (OperationCanceledException) { @@ -72,7 +72,7 @@ public void Flush(CancellationToken cancellationToken) } catch (Exception ex) { - logger.LogError(ex, "Error flushing Kafka producer"); + this.logger.LogError(ex, "Error flushing Kafka producer"); throw; } } @@ -122,11 +122,11 @@ public void Produce(string topic, KafkaEventData item) try { - producer.BeginProduce(topicUsed, msg, DeliveryHandler); + this.producer.BeginProduce(topicUsed, msg, this.DeliveryHandler); } catch (Exception ex) { - logger.LogError(ex, "Error producing into {topic}", topicUsed); + this.logger.LogError(ex, "Error producing into {topic}", topicUsed); throw; } } @@ -135,7 +135,7 @@ private void DeliveryHandler(DeliveryReport deliveredItem) { if (deliveredItem.Error == null || deliveredItem.Error.Code == ErrorCode.NoError) { - logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveredItem.Topic, (int)deliveredItem.Partition, (long)deliveredItem.Offset); + this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveredItem.Topic, (int)deliveredItem.Partition, (long)deliveredItem.Offset); } else { From 3275ce75847a93bf99ac5d2ef29bc75502e0fdc6 Mon Sep 17 00:00:00 2001 From: Brandon H Date: Thu, 4 Apr 2019 12:05:46 -0700 Subject: [PATCH 4/4] utilizing nullability for setting ProducerConfig; allows Kafka SDK to control what the defaults are. NOTE: This may necessitate updaing the comments on these properties in the future --- .../Output/KafkaAttribute.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index ff306292..46b1a755 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -87,32 +87,32 @@ private bool IsValidValueType(Type value) /// /// Gets or sets the Maximum transmit message size. Default: 1MB /// - public int MaxMessageBytes { get; set; } = 1024000; + public int? MaxMessageBytes { get; set; } /// /// Maximum number of messages batched in one MessageSet. default: 10000 /// - public int BatchSize { get; set; } = 1000; + public int? BatchSize { get; set; } /// /// When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false /// - public bool EnableIdempotence { get; set; } = false; + public bool? EnableIdempotence { get; set; } /// /// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000 /// - public int MessageTimeoutMs { get; set; } = 300000; + public int? MessageTimeoutMs { get; set; } /// /// The ack timeout of the producer request in milliseconds. default: 5000 /// - public int RequestTimeoutMs { get; set; } = 5000; + public int? RequestTimeoutMs { get; set; } /// /// How many times to retry sending a failing Message. **Note:** default: 2 /// /// Retrying may cause reordering unless EnableIdempotence is set to true. - public int MaxRetries { get; set; } = 2; + public int? MaxRetries { get; set; } } } \ No newline at end of file