From 654373244543e36359d6f2fb8a6310ef14dce0c3 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Wed, 12 Oct 2022 13:02:39 +0530 Subject: [PATCH 01/15] fixing the producer perf for in-proc --- .../Output/KafkaProducerEntity.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs index 0a2b0f15..e5a819a7 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs @@ -3,6 +3,8 @@ using System; using System.Collections; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -25,17 +27,19 @@ public class KafkaProducerEntity internal async Task SendAndCreateEntityIfNotExistsAsync(T item, Guid functionInstanceId, CancellationToken cancellationToken) { var kafkaProducer = this.KafkaProducerFactory.Create(this); + List tasks = new List(); if (item is ICollection collection) { foreach (var collectionItem in collection) { - await kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(collectionItem)); + tasks.Add(kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(collectionItem))); } } else { - await kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(item)); + tasks.Add(kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(item))); } + await Task.WhenAll(tasks); } private object GetItemToProduce(T item) From 0f31806094b49ab221333fbe241014c518d1bfa9 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Thu, 27 Oct 2022 21:44:05 +0530 Subject: [PATCH 02/15] adding the support for binders --- .../Output/CollectorValueProvider.cs | 8 ++++++- .../Output/KafkaProducerAsyncCollector.cs | 24 +++++++++++++++---- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/CollectorValueProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/CollectorValueProvider.cs index 3e5db58e..b0664135 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/CollectorValueProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/CollectorValueProvider.cs @@ -2,12 +2,13 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Host.Bindings; namespace Microsoft.Azure.WebJobs.Extensions.Kafka { - internal class CollectorValueProvider : IValueProvider + internal class CollectorValueProvider : IValueBinder { private readonly KafkaProducerEntity entity; private readonly object value; @@ -35,6 +36,11 @@ public Task GetValueAsync() return Task.FromResult(value); } + public Task SetValueAsync(object value, CancellationToken cancellationToken) + { + return ((KafkaProducerAsyncCollector)this.value).FlushAsync(); + } + public string ToInvokeString() { return this.entity.Topic; diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs index 51b213e0..32a8b6cb 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs @@ -2,6 +2,8 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Text; using System.Threading; @@ -15,6 +17,7 @@ internal class KafkaProducerAsyncCollector : IAsyncCollector { private readonly KafkaProducerEntity entity; private readonly Guid functionInstanceId; + private List eventList = new List(); public KafkaProducerAsyncCollector(KafkaProducerEntity entity, Guid functionInstanceId) { @@ -46,7 +49,8 @@ public Task AddAsync(T item, CancellationToken cancellationToken) messageToSend = new KafkaEventData(item); } - return entity.SendAndCreateEntityIfNotExistsAsync(messageToSend, functionInstanceId, cancellationToken); + eventList.Add(messageToSend); + return Task.FromResult(item); } private object ConvertToKafkaEventData(T item) @@ -124,10 +128,22 @@ private static object BuildKafkaEventDataForKeyValue(JObject dataObj) return messageToSend; } - public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) + public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) { - // Batching not supported. - return Task.FromResult(0); + object[] eventArr = new object[eventList.Count]; + + lock (eventList) + { + eventArr = eventList.ToArray(); + eventList.Clear(); + } + List taskList = new List(); + foreach (object obj in eventArr) + { + taskList.Add(entity.SendAndCreateEntityIfNotExistsAsync(obj, functionInstanceId, cancellationToken)); + } + + await Task.WhenAll(taskList); } } } \ No newline at end of file From 8e9aab79c34fb61882e18d803dec737841c09138 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Sat, 5 Nov 2022 17:16:44 +0530 Subject: [PATCH 03/15] fixing the regression issue --- .../Output/CollectorValueProvider.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/CollectorValueProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/CollectorValueProvider.cs index b0664135..6f9540eb 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/CollectorValueProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/CollectorValueProvider.cs @@ -38,7 +38,8 @@ public Task GetValueAsync() public Task SetValueAsync(object value, CancellationToken cancellationToken) { - return ((KafkaProducerAsyncCollector)this.value).FlushAsync(); + object retVal = (this.value.GetType().GetMethod("FlushAsync")).Invoke(this.value, new object[] { System.Reflection.Missing.Value }); + return Task.FromResult(retVal); } public string ToInvokeString() From da69f5e4e6a700a2b4672f72413dd68dfea89f2c Mon Sep 17 00:00:00 2001 From: shrohilla Date: Mon, 7 Nov 2022 18:05:53 +0530 Subject: [PATCH 04/15] adding the disposabale to dispose off all the events in the collectors --- .../Output/KafkaProducerAsyncCollector.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs index 32a8b6cb..2bd87ea9 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs @@ -13,7 +13,7 @@ [assembly: InternalsVisibleTo("Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b5fc90e7027f67871e773a8fde8938c81dd402ba65b9201d60593e96c492651e889cc13f1415ebb53fac1131ae0bd333c5ee6021672d9718ea31a8aebd0da0072f25d87dba6fc90ffd598ed4da35e44c398c454307e8e33b8426143daec9f596836f97c8f74750e5975c64e2189f45def46b2a2b1247adc3652bf5c308055da9")] namespace Microsoft.Azure.WebJobs.Extensions.Kafka { - internal class KafkaProducerAsyncCollector : IAsyncCollector + internal class KafkaProducerAsyncCollector : IAsyncCollector, IDisposable { private readonly KafkaProducerEntity entity; private readonly Guid functionInstanceId; @@ -145,5 +145,10 @@ private static object BuildKafkaEventDataForKeyValue(JObject dataObj) await Task.WhenAll(taskList); } + + public async void Dispose() + { + await this.FlushAsync(); + } } } \ No newline at end of file From d401e3378c6e0da3a524b089b73592c0d45930d9 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Sat, 12 Nov 2022 18:13:24 +0530 Subject: [PATCH 05/15] adding the EventOrder --- .../Config/EventsOrderType.cs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/EventsOrderType.cs diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/EventsOrderType.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/EventsOrderType.cs new file mode 100644 index 00000000..042c33a5 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/EventsOrderType.cs @@ -0,0 +1,16 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Microsoft.Azure.WebJobs.Extensions.Kafka +{ + public enum EventsOrderType + { + SEQUENTIAL, + KEY, + NONE + } +} From 66b38ba30168f996258a65c9af2dd68e44fcc72d Mon Sep 17 00:00:00 2001 From: shrohilla Date: Sat, 12 Nov 2022 18:14:14 +0530 Subject: [PATCH 06/15] adding the fault case --- .../Output/KafkaProducer.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs index 01f07e2c..7280a564 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs @@ -104,6 +104,7 @@ public async Task ProduceAsync(string topic, object item) public void Dispose() { + this.producer?.Flush(); this.producer?.Dispose(); this.producer = null; GC.SuppressFinalize(this); From ec8ebee2fccb38d895973e4b9dc09c8aba281d12 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Wed, 16 Nov 2022 23:13:01 +0530 Subject: [PATCH 07/15] boosting the producer performance --- .../Output/IKafkaProducer.cs | 10 + .../Output/KafkaAttribute.cs | 13 +- .../Output/KafkaProducer.cs | 88 +++++++-- .../Output/KafkaProducerAsyncCollector.cs | 174 +++++++++--------- .../Output/KafkaProducerEntity.cs | 89 ++++++++- 5 files changed, 255 insertions(+), 119 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/IKafkaProducer.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/IKafkaProducer.cs index 85b6b3aa..d0428ff9 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/IKafkaProducer.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/IKafkaProducer.cs @@ -13,5 +13,15 @@ public interface IKafkaProducer : IDisposable /// Produces a Kafka message /// Task ProduceAsync(string topic, object item); + + /// + /// Produces a Kafka message + /// + void Produce(string topic, object item); + + /// + /// Flushing all added messages to broker + /// + void Flush(); } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 74cc80b0..85387a48 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -64,7 +64,7 @@ public KafkaAttribute() /// /// When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false /// - public bool? EnableIdempotence { get; set; } + public bool EnableIdempotence { get; set; } = false; /// /// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000 @@ -139,5 +139,16 @@ public KafkaAttribute() /// ssl.key.password in librdkafka /// public string SslKeyPassword { get; set; } + + /// + /// Producer will producer based on Events order type + /// Allowed Values are SEQUENTIAL, KEY, NONE + /// SEQUENTIAL will always write events in order for all events i.e. FIFO, but it will hit the performance + /// KEY will always write events in any order but follow SEQUENTIAL per key basis + /// None will always write events in any order + /// Default is SEQUENTIAL + /// Note : For best performance we recommend None + /// + public EventsOrderType OrderType { get; set; } = EventsOrderType.SEQUENTIAL; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs index 7280a564..3b0f42ce 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs @@ -55,24 +55,72 @@ public KafkaProducer( public async Task ProduceAsync(string topic, object item) { - if (item == null) + ValidateItem(item); + IKafkaEventData actualItem = GetItem(item); + Message msg = BuildMessage(item, actualItem); + string topicUsed = FindTopic(topic, actualItem); + + try { - throw new ArgumentNullException(nameof(item)); + var deliveryResult = await this.producer.ProduceAsync(topicUsed, msg); + + this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveryResult.Topic, (int)deliveryResult.Partition, (long)deliveryResult.Offset); + } + catch (ProduceException produceException) + { + logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", produceException.DeliveryResult?.Topic, (int)produceException.DeliveryResult?.Partition, (long)produceException.DeliveryResult?.Offset, produceException.Error.Reason, produceException.Error.ToString()); + throw; + } + catch (Exception ex) + { + this.logger.LogError(ex, "Error producing into {topic}", topicUsed); + throw; } + } - var actualItem = (IKafkaEventData)item; - if (actualItem == null) + public void Produce(string topic, object item) + { + ValidateItem(item); + IKafkaEventData actualItem = GetItem(item); + Message msg = BuildMessage(item, actualItem); + string topicUsed = FindTopic(topic, actualItem); + + try { - throw new ArgumentException($"Message value is not of the expected type. Expected: {typeof(KafkaEventData).Name}. Actual: {item.GetType().Name}"); + logger.LogInformation("in Produce method"); + this.producer.Produce(topicUsed, msg, + deliveryResult => this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveryResult.Topic, (int)deliveryResult.Partition, (long)deliveryResult.Offset)); + } + catch (ProduceException produceException) + { + logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", produceException.DeliveryResult?.Topic, (int)produceException.DeliveryResult?.Partition, (long)produceException.DeliveryResult?.Offset, produceException.Error.Reason, produceException.Error.ToString()); + throw; + } + catch (Exception ex) + { + this.logger.LogError(ex, "Error producing into {topic}", topicUsed); + throw; } + } - if (actualItem.Value == null) + public void Flush() + { + this.producer.Flush(); + } + + private static IKafkaEventData GetItem(object item) + { + IKafkaEventData actualItem = (IKafkaEventData)item; + if (actualItem == null) { - throw new ArgumentException("Message value was not defined"); + throw new ArgumentException($"Message value is not of the expected type. Expected: {typeof(KafkaEventData).Name}. Actual: {item.GetType().Name}"); } - var msg = MessageBuilder.BuildFrom(actualItem); + return actualItem; + } + private static string FindTopic(string topic, IKafkaEventData actualItem) + { var topicUsed = topic; if (string.IsNullOrEmpty(topic)) { @@ -84,21 +132,23 @@ public async Task ProduceAsync(string topic, object item) } } - try - { - var deliveryResult = await this.producer.ProduceAsync(topicUsed, msg); + return topicUsed; + } - this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveryResult.Topic, (int)deliveryResult.Partition, (long)deliveryResult.Offset); - } - catch (ProduceException produceException) + private Message BuildMessage(object item, IKafkaEventData actualItem) + { + if (actualItem.Value == null) { - logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", produceException.DeliveryResult?.Topic, (int)produceException.DeliveryResult?.Partition, (long)produceException.DeliveryResult?.Offset, produceException.Error.Reason, produceException.Error.ToString()); - throw; + throw new ArgumentException("Message value was not defined"); } - catch (Exception ex) + return MessageBuilder.BuildFrom(actualItem); + } + + private static void ValidateItem(object item) + { + if (item == null) { - this.logger.LogError(ex, "Error producing into {topic}", topicUsed); - throw; + throw new ArgumentNullException(nameof(item)); } } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs index 2bd87ea9..ec2cff83 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs @@ -8,6 +8,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using System.Timers; using Newtonsoft.Json.Linq; [assembly: InternalsVisibleTo("Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b5fc90e7027f67871e773a8fde8938c81dd402ba65b9201d60593e96c492651e889cc13f1415ebb53fac1131ae0bd333c5ee6021672d9718ea31a8aebd0da0072f25d87dba6fc90ffd598ed4da35e44c398c454307e8e33b8426143daec9f596836f97c8f74750e5975c64e2189f45def46b2a2b1247adc3652bf5c308055da9")] @@ -18,6 +19,7 @@ internal class KafkaProducerAsyncCollector : IAsyncCollector, IDisposable private readonly KafkaProducerEntity entity; private readonly Guid functionInstanceId; private List eventList = new List(); + private IConverter kafkaEventDataConverter = new KafkaEventDataConverter(); public KafkaProducerAsyncCollector(KafkaProducerEntity entity, Guid functionInstanceId) { @@ -37,118 +39,112 @@ public Task AddAsync(T item, CancellationToken cancellationToken) throw new InvalidOperationException("Cannot produce a null message instance."); } - object messageToSend = item; - - if (item.GetType() == typeof(string)) - { - messageToSend = ConvertToKafkaEventData(item); - } + eventList.Add(kafkaEventDataConverter.Convert(item)); + return Task.CompletedTask; + } - if (item.GetType() == typeof(byte[])) + public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) + { + List eventObjList; + lock (eventList) { - messageToSend = new KafkaEventData(item); + eventObjList = new List(eventList); + eventList.Clear(); } - eventList.Add(messageToSend); - return Task.FromResult(item); + await entity.SendAndCreateEntityIfNotExistsAsync(eventObjList, functionInstanceId, cancellationToken); } - private object ConvertToKafkaEventData(T item) + public async void Dispose() { - try - { - return BuildKafkaDataEvent(item); - } - catch (Exception) - { - return new KafkaEventData(item); - } + await this.FlushAsync(); } - private object BuildKafkaDataEvent(T item) + private class KafkaEventDataConverter : IConverter { - JObject dataObj = JObject.Parse(item.ToString()); - if (dataObj == null) - { - return new KafkaEventData(item); + public object Convert(T item) + { + if (item.GetType() == typeof(string)) + { + return ConvertToKafkaEventData(item); + } + if (item.GetType() == typeof(byte[])) + { + return new KafkaEventData(item); + } + return item; } - if (dataObj.ContainsKey("Offset") && dataObj.ContainsKey("Partition") && dataObj.ContainsKey("Topic") - && dataObj.ContainsKey("Timestamp") && dataObj.ContainsKey("Value") && dataObj.ContainsKey("Headers")) + private object ConvertToKafkaEventData(T item) { - return BuildKafkaEventData(dataObj); + try + { + return BuildKafkaDataEvent(item); + } + catch (Exception) + { + return new KafkaEventData(item); + } } - return new KafkaEventData(item); - } - - private object BuildKafkaEventData(JObject dataObj) - { - if (dataObj["Key"] != null) - { - return BuildKafkaEventDataForKeyValue(dataObj); - } - else - { - return BuildKafkaEventDataForValue(dataObj); + private object BuildKafkaDataEvent(T item) + { + JObject dataObj = JObject.Parse(item.ToString()); + if (dataObj == null) + { + return new KafkaEventData(item); + } + if (dataObj.ContainsKey("Offset") && dataObj.ContainsKey("Partition") && dataObj.ContainsKey("Topic") + && dataObj.ContainsKey("Timestamp") && dataObj.ContainsKey("Value") && dataObj.ContainsKey("Headers")) + { + return BuildKafkaEventData(dataObj); + } + return new KafkaEventData(item); } - } - private static object BuildKafkaEventDataForValue(JObject dataObj) - { - KafkaEventData messageToSend = new KafkaEventData((string)dataObj["Value"]); - messageToSend.Timestamp = (DateTime)dataObj["Timestamp"]; - messageToSend.Partition = (int)dataObj["Partition"]; - JArray headerList = (JArray)dataObj["Headers"]; - foreach (JObject header in headerList) + private object BuildKafkaEventData(JObject dataObj) { - messageToSend.Headers.Add((string)header["Key"], Encoding.UTF8.GetBytes((string)header["Value"])); + if (dataObj["Key"] != null) + { + return BuildKafkaEventDataForKeyValue(dataObj); + } + return BuildKafkaEventDataForValue(dataObj); } - return messageToSend; - } - private static object BuildKafkaEventDataForKeyValue(JObject dataObj) - { - string value = null; - if (dataObj["Value"] != null && dataObj["Value"].Type.ToString().Equals("Object")) - { - value = Newtonsoft.Json.JsonConvert.SerializeObject(dataObj["Value"]); - } else - { - value = (string)dataObj["Value"]; - } - KafkaEventData messageToSend = new KafkaEventData((string)dataObj["Key"], value); - messageToSend.Timestamp = (DateTime)dataObj["Timestamp"]; - messageToSend.Partition = (int)dataObj["Partition"]; - JArray headerList = (JArray)dataObj["Headers"]; - foreach (JObject header in headerList) - { - messageToSend.Headers.Add((string)header["Key"], Encoding.UTF8.GetBytes((string)header["Value"])); + private static object BuildKafkaEventDataForValue(JObject dataObj) + { + KafkaEventData messageToSend = new KafkaEventData((string)dataObj["Value"]); + messageToSend.Timestamp = (DateTime)dataObj["Timestamp"]; + messageToSend.Partition = (int)dataObj["Partition"]; + JArray headerList = (JArray)dataObj["Headers"]; + foreach (JObject header in headerList) + { + messageToSend.Headers.Add((string)header["Key"], Encoding.UTF8.GetBytes((string)header["Value"])); + } + return messageToSend; } - return messageToSend; - } - public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) - { - object[] eventArr = new object[eventList.Count]; - - lock (eventList) - { - eventArr = eventList.ToArray(); - eventList.Clear(); + private static object BuildKafkaEventDataForKeyValue(JObject dataObj) + { + string value = null; + if (dataObj["Value"] != null && dataObj["Value"].Type.ToString().Equals("Object")) + { + value = Newtonsoft.Json.JsonConvert.SerializeObject(dataObj["Value"]); + } + else + { + value = (string)dataObj["Value"]; + } + KafkaEventData messageToSend = new KafkaEventData((string)dataObj["Key"], value); + messageToSend.Timestamp = (DateTime)dataObj["Timestamp"]; + messageToSend.Partition = (int)dataObj["Partition"]; + JArray headerList = (JArray)dataObj["Headers"]; + foreach (JObject header in headerList) + { + messageToSend.Headers.Add((string)header["Key"], Encoding.UTF8.GetBytes((string)header["Value"])); + } + return messageToSend; } - List taskList = new List(); - foreach (object obj in eventArr) - { - taskList.Add(entity.SendAndCreateEntityIfNotExistsAsync(obj, functionInstanceId, cancellationToken)); - } - - await Task.WhenAll(taskList); - } - - public async void Dispose() - { - await this.FlushAsync(); } } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs index e5a819a7..d6034a5c 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs @@ -1,12 +1,14 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using Confluent.Kafka; using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using System.Xml; namespace Microsoft.Azure.WebJobs.Extensions.Kafka { @@ -24,24 +26,91 @@ public class KafkaProducerEntity public KafkaAttribute Attribute { get; set; } - internal async Task SendAndCreateEntityIfNotExistsAsync(T item, Guid functionInstanceId, CancellationToken cancellationToken) + internal Task SendAndCreateEntityIfNotExistsAsync(T item, Guid functionInstanceId, CancellationToken cancellationToken) { var kafkaProducer = this.KafkaProducerFactory.Create(this); - List tasks = new List(); - if (item is ICollection collection) + + if (!(item is ICollection)) { - foreach (var collectionItem in collection) - { - tasks.Add(kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(collectionItem))); - } + //await kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(item)); + kafkaProducer.Produce(this.Topic, this.GetItemToProduce(item)); + kafkaProducer.Flush(); + return Task.CompletedTask; } - else + ICollection collection = (ICollection)item; + /*if (EventsOrderType.KEY.Equals(this.Attribute.OrderType)) { - tasks.Add(kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(item))); + ProduceKafkaEventsByKey(collection, kafkaProducer); + }*/ + + //await ProduceEvents(collection, kafkaProducer); + //ProduceEvents(collection, kafkaProducer); + ProduceEvent(collection, kafkaProducer); + return Task.CompletedTask; + } + + /*private async void ProduceKafkaEventsByKey(ICollection items, IKafkaProducer kafkaProducer) + { + IDictionary> eventMap = BuildKeyDictionary(items); + if (eventMap.Count == 0) + { + return; } - await Task.WhenAll(tasks); + List taskList = new List(); + foreach (KeyValuePair> entry in eventMap) + { + taskList.Add(ProduceEvents(entry.Value, kafkaProducer)); + } + await Task.WhenAll(taskList); + } + + private static IDictionary> BuildKeyDictionary(ICollection items) + { + IDictionary> eventMap = new Dictionary>(); + foreach (var item in items) + { + object key = item.GetType().GetProperty("Key").GetValue(item); + List eventDataList = eventMap.TryGetValue(key, out eventDataList) ? eventDataList : new List(); + eventDataList.Add(item); + eventMap[key] = eventDataList; + } + + return eventMap; } + private void ProduceEvents(ICollection collection, IKafkaProducer kafkaProducer) + { + if (collection == null || collection.Count == 0) + { + return; + } + if (EventsOrderType.NONE.Equals(this.Attribute.OrderType)) + { + await ProduceEventsAsync(collection, kafkaProducer); + return; + } + ProduceEvent(collection, kafkaProducer); + }*/ + + private void ProduceEvent(ICollection collection, IKafkaProducer kafkaProducer) + { + foreach (var collectionItem in collection) + { + kafkaProducer.Produce(this.Topic, this.GetItemToProduce(collectionItem)); + } + kafkaProducer.Flush(); + } + + /*private async Task ProduceEventsAsync(ICollection collection, IKafkaProducer kafkaProducer) + { + List tasks = new List(); + foreach (var collectionItem in collection) + { + tasks.Add(kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(collectionItem))); + } + await Task.WhenAll(tasks); + }*/ + private object GetItemToProduce(T item) { if (item is IKafkaEventData) From 6edda93b582cf56f6d0f8dbfd0b600a7121d7948 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Thu, 17 Nov 2022 11:20:26 +0530 Subject: [PATCH 08/15] removing the app setting config variable --- .../Config/EventsOrderType.cs | 16 ---------------- .../Output/KafkaAttribute.cs | 11 ----------- 2 files changed, 27 deletions(-) delete mode 100644 src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/EventsOrderType.cs diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/EventsOrderType.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/EventsOrderType.cs deleted file mode 100644 index 042c33a5..00000000 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/EventsOrderType.cs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; -using System.Text; - -namespace Microsoft.Azure.WebJobs.Extensions.Kafka -{ - public enum EventsOrderType - { - SEQUENTIAL, - KEY, - NONE - } -} diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 85387a48..5a7745de 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -139,16 +139,5 @@ public KafkaAttribute() /// ssl.key.password in librdkafka /// public string SslKeyPassword { get; set; } - - /// - /// Producer will producer based on Events order type - /// Allowed Values are SEQUENTIAL, KEY, NONE - /// SEQUENTIAL will always write events in order for all events i.e. FIFO, but it will hit the performance - /// KEY will always write events in any order but follow SEQUENTIAL per key basis - /// None will always write events in any order - /// Default is SEQUENTIAL - /// Note : For best performance we recommend None - /// - public EventsOrderType OrderType { get; set; } = EventsOrderType.SEQUENTIAL; } } \ No newline at end of file From d69baa489c192ed8292cc0476b8475f9507b5ee7 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Thu, 17 Nov 2022 11:42:48 +0530 Subject: [PATCH 09/15] reverting the change of Enable Idempotency --- .../Output/KafkaAttribute.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 5a7745de..74cc80b0 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -64,7 +64,7 @@ public KafkaAttribute() /// /// When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false /// - public bool EnableIdempotence { get; set; } = false; + public bool? EnableIdempotence { get; set; } /// /// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000 From 413579e61bf83060218fae72cc1a70715dd7cc91 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Thu, 17 Nov 2022 11:48:41 +0530 Subject: [PATCH 10/15] refactoring the code and removing uneccessary comments --- .../Output/Converter/IConverter.cs | 13 ++++ .../Output/KafkaProducerEntity.cs | 64 +------------------ 2 files changed, 14 insertions(+), 63 deletions(-) create mode 100644 src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/Converter/IConverter.cs diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/Converter/IConverter.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/Converter/IConverter.cs new file mode 100644 index 00000000..984a3123 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/Converter/IConverter.cs @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +using System; +using System.Collections.Generic; +using System.Text; + +namespace Microsoft.Azure.WebJobs.Extensions.Kafka.Output +{ + public interface IConverter + { + Response Convert(Request request); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs index d6034a5c..2fecd941 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs @@ -32,67 +32,15 @@ internal Task SendAndCreateEntityIfNotExistsAsync(T item, Guid functionInstan if (!(item is ICollection)) { - //await kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(item)); kafkaProducer.Produce(this.Topic, this.GetItemToProduce(item)); kafkaProducer.Flush(); return Task.CompletedTask; } - ICollection collection = (ICollection)item; - /*if (EventsOrderType.KEY.Equals(this.Attribute.OrderType)) - { - ProduceKafkaEventsByKey(collection, kafkaProducer); - }*/ - - //await ProduceEvents(collection, kafkaProducer); - //ProduceEvents(collection, kafkaProducer); - ProduceEvent(collection, kafkaProducer); + ProduceEvents((ICollection)item, kafkaProducer); return Task.CompletedTask; } - /*private async void ProduceKafkaEventsByKey(ICollection items, IKafkaProducer kafkaProducer) - { - IDictionary> eventMap = BuildKeyDictionary(items); - if (eventMap.Count == 0) - { - return; - } - List taskList = new List(); - foreach (KeyValuePair> entry in eventMap) - { - taskList.Add(ProduceEvents(entry.Value, kafkaProducer)); - } - await Task.WhenAll(taskList); - } - - private static IDictionary> BuildKeyDictionary(ICollection items) - { - IDictionary> eventMap = new Dictionary>(); - foreach (var item in items) - { - object key = item.GetType().GetProperty("Key").GetValue(item); - List eventDataList = eventMap.TryGetValue(key, out eventDataList) ? eventDataList : new List(); - eventDataList.Add(item); - eventMap[key] = eventDataList; - } - - return eventMap; - } - private void ProduceEvents(ICollection collection, IKafkaProducer kafkaProducer) - { - if (collection == null || collection.Count == 0) - { - return; - } - if (EventsOrderType.NONE.Equals(this.Attribute.OrderType)) - { - await ProduceEventsAsync(collection, kafkaProducer); - return; - } - ProduceEvent(collection, kafkaProducer); - }*/ - - private void ProduceEvent(ICollection collection, IKafkaProducer kafkaProducer) { foreach (var collectionItem in collection) { @@ -101,16 +49,6 @@ private void ProduceEvent(ICollection collection, IKafkaProducer kafkaProducer) kafkaProducer.Flush(); } - /*private async Task ProduceEventsAsync(ICollection collection, IKafkaProducer kafkaProducer) - { - List tasks = new List(); - foreach (var collectionItem in collection) - { - tasks.Add(kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(collectionItem))); - } - await Task.WhenAll(tasks); - }*/ - private object GetItemToProduce(T item) { if (item is IKafkaEventData) From 470c810150d13b37a491fb2eb2a5f1d4c03becf1 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Thu, 17 Nov 2022 11:51:51 +0530 Subject: [PATCH 11/15] changing the converter --- .../Output/KafkaProducerAsyncCollector.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs index ec2cff83..ef4b9694 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using System.Timers; +using Microsoft.Azure.WebJobs.Extensions.Kafka.Output; using Newtonsoft.Json.Linq; [assembly: InternalsVisibleTo("Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b5fc90e7027f67871e773a8fde8938c81dd402ba65b9201d60593e96c492651e889cc13f1415ebb53fac1131ae0bd333c5ee6021672d9718ea31a8aebd0da0072f25d87dba6fc90ffd598ed4da35e44c398c454307e8e33b8426143daec9f596836f97c8f74750e5975c64e2189f45def46b2a2b1247adc3652bf5c308055da9")] From 34eca5c27dbe97f62587bc39dc669bb7c21d330e Mon Sep 17 00:00:00 2001 From: shrohilla Date: Thu, 17 Nov 2022 13:01:37 +0530 Subject: [PATCH 12/15] adding the unit test for convering the converter cases along with test --- .../output/KafkaProducerAsyncCollectorTest.cs | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/output/KafkaProducerAsyncCollectorTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/output/KafkaProducerAsyncCollectorTest.cs index 6c540344..62127e54 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/output/KafkaProducerAsyncCollectorTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/output/KafkaProducerAsyncCollectorTest.cs @@ -7,6 +7,7 @@ using Xunit; using Moq; using System.Threading.Tasks; +using Newtonsoft.Json.Linq; namespace Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests.output { @@ -78,5 +79,41 @@ public void AddAsync_Item_Is_Of_KafkaEventData_Json_String_Header_Types() Task task = asyncCollector.AddAsync(jsonStringValueHeader, default); Assert.True(task.IsCompleted); } + + [Fact] + public void AddAsync_Item_Is_Of_KafkaEventData_Json_String_Key_Types() + { + BuildMockData(); + JObject dataObj = JObject.Parse(jsonStringValueHeader); + dataObj["Key"] = 1; + IAsyncCollector asyncCollector = new KafkaProducerAsyncCollector( + kafkaProducerEntity, Guid.NewGuid()); + + Task task = asyncCollector.AddAsync(dataObj.ToString(), default); + Assert.True(task.IsCompleted); + } + + [Fact] + public void AddAsync_Item_Is_Of_KafkaEventData_Json_String_Key_Types_Flush() + { + BuildMockData(); + List jobjList = new List(); + for (int i=0; i<20; i++) + { + JObject dataObj = JObject.Parse(jsonStringValueHeader); + dataObj["Key"] = i % 2; + jobjList.Add(dataObj); + } + IAsyncCollector asyncCollector = new KafkaProducerAsyncCollector( + kafkaProducerEntity, Guid.NewGuid()); + + foreach (JObject dataObj in jobjList) + { + Task task = asyncCollector.AddAsync(dataObj.ToString(), default); + Assert.True(task.IsCompleted); + } + Task flushTask = asyncCollector.FlushAsync(); + Assert.True(flushTask.IsCompleted); + } } } From dae3bc5f2ae9478ded2878b39455c27de3ef775c Mon Sep 17 00:00:00 2001 From: shrohilla Date: Thu, 17 Nov 2022 13:42:00 +0530 Subject: [PATCH 13/15] optimizing the code --- .../Output/KafkaProducerEntity.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs index 2fecd941..82780ac2 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs @@ -30,13 +30,14 @@ internal Task SendAndCreateEntityIfNotExistsAsync(T item, Guid functionInstan { var kafkaProducer = this.KafkaProducerFactory.Create(this); - if (!(item is ICollection)) + if (item is ICollection) { - kafkaProducer.Produce(this.Topic, this.GetItemToProduce(item)); - kafkaProducer.Flush(); + ProduceEvents((ICollection)item, kafkaProducer); return Task.CompletedTask; } - ProduceEvents((ICollection)item, kafkaProducer); + //await kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(item)); + kafkaProducer.Produce(this.Topic, this.GetItemToProduce(item)); + kafkaProducer.Flush(); return Task.CompletedTask; } From c3443c986990e41745e0b3d96bdfde9734c5cedc Mon Sep 17 00:00:00 2001 From: shrohilla Date: Fri, 18 Nov 2022 01:06:50 +0530 Subject: [PATCH 14/15] fixing the regression --- .../Serialization/SerializationHelper.cs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs index 2175ae60..dd837511 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs @@ -72,7 +72,14 @@ internal static object ResolveValueSerializer(Type valueType, string specifiedAv } var schemaRegistry = new LocalSchemaRegistry(specifiedAvroSchema); - return Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(valueType), schemaRegistry, null /* config */); + var serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(valueType), schemaRegistry, null /* config */); + serializer = typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(valueType).Invoke(null, new object[] { serializer }); + //SyncOverAsyncSerializerExtensionMethods.AsSyncOverAsync(serializer); + //typeof(serializer).GetMethods(BindingFlags.Static | BindingFlags.Public).Where(mi => mi.Name.Equals("AsSyncOverAsync")); + /*//((AvroSerializer) serializer).As + var t = (serializer as IAsyncSerializer<>); + t.AsSyncOverAsync()*/ + return serializer; } return null; From 35a86769e2e47389462357ccca6cf4aac88ecb6d Mon Sep 17 00:00:00 2001 From: shrohilla Date: Fri, 18 Nov 2022 01:15:39 +0530 Subject: [PATCH 15/15] fixing the unit tests --- .../KafkaProducerFactoryTest.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs index af3c7bc8..e0d16686 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs @@ -9,6 +9,7 @@ using Avro.Generic; using Confluent.Kafka; +using Confluent.Kafka.SyncOverAsync; using Confluent.SchemaRegistry.Serdes; using Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests.Helpers; @@ -116,7 +117,8 @@ public void When_Avro_Schema_Is_Provided_Should_Create_GenericRecord_Listener() Assert.IsType>(producer); var typedProducer = (KafkaProducer)producer; Assert.NotNull(typedProducer.ValueSerializer); - Assert.IsType>(typedProducer.ValueSerializer); + //Assert.IsType>(typedProducer.ValueSerializer); + Assert.IsType>(typedProducer.ValueSerializer); } @@ -142,7 +144,7 @@ public void When_Value_Type_Is_Specific_Record_Should_Create_SpecificRecord_List Assert.IsType>(producer); var typedProducer = (KafkaProducer)producer; Assert.NotNull(typedProducer.ValueSerializer); - Assert.IsType>(typedProducer.ValueSerializer); + Assert.IsType>(typedProducer.ValueSerializer); } [Fact]