Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6543732
fixing the producer perf for in-proc
shrohilla Oct 12, 2022
34a9c75
Merge branch 'dev' into shrohilla/producer-perf
shrohilla Oct 12, 2022
0f31806
adding the support for binders
shrohilla Oct 27, 2022
1cfdc46
Merge branch 'shrohilla/producer-perf' of github.com:Azure/azure-func…
shrohilla Oct 27, 2022
87ed0b0
Merge branch 'dev' into shrohilla/producer-perf
shrohilla Oct 27, 2022
8e9aab7
fixing the regression issue
shrohilla Nov 5, 2022
53a7dfc
Merge branch 'dev' into shrohilla/producer-perf
shrohilla Nov 5, 2022
510c188
Merge branch 'dev' into shrohilla/producer-perf
shrohilla Nov 7, 2022
da69f5e
adding the disposabale to dispose off all the events in the collectors
shrohilla Nov 7, 2022
e943b22
Merge branch 'shrohilla/producer-perf' of github.com:Azure/azure-func…
shrohilla Nov 7, 2022
d401e33
adding the EventOrder
shrohilla Nov 12, 2022
66b38ba
adding the fault case
shrohilla Nov 12, 2022
ec8ebee
boosting the producer performance
shrohilla Nov 16, 2022
6edda93
removing the app setting config variable
shrohilla Nov 17, 2022
d69baa4
reverting the change of Enable Idempotency
shrohilla Nov 17, 2022
413579e
refactoring the code and removing uneccessary comments
shrohilla Nov 17, 2022
470c810
changing the converter
shrohilla Nov 17, 2022
34eca5c
adding the unit test for convering the converter cases along with test
shrohilla Nov 17, 2022
dae3bc5
optimizing the code
shrohilla Nov 17, 2022
c3443c9
fixing the regression
shrohilla Nov 17, 2022
35a8676
fixing the unit tests
shrohilla Nov 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Bindings;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
internal class CollectorValueProvider : IValueProvider
internal class CollectorValueProvider : IValueBinder
{
private readonly KafkaProducerEntity entity;
private readonly object value;
Expand Down Expand Up @@ -35,6 +36,12 @@ public Task<object> GetValueAsync()
return Task.FromResult(value);
}

public Task SetValueAsync(object value, CancellationToken cancellationToken)
{
object retVal = (this.value.GetType().GetMethod("FlushAsync")).Invoke(this.value, new object[] { System.Reflection.Missing.Value });
return Task.FromResult(retVal);
}

public string ToInvokeString()
{
return this.entity.Topic;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka.Output
{
public interface IConverter<Request, Response>
{
Response Convert(Request request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,15 @@ public interface IKafkaProducer : IDisposable
/// Produces a Kafka message
/// </summary>
Task ProduceAsync(string topic, object item);

/// <summary>
/// Produces a Kafka message
/// </summary>
void Produce(string topic, object item);

/// <summary>
/// Flushing all added messages to broker
/// </summary>
void Flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,72 @@ public KafkaProducer(

public async Task ProduceAsync(string topic, object item)
{
if (item == null)
ValidateItem(item);
IKafkaEventData actualItem = GetItem(item);
Message<TKey, TValue> msg = BuildMessage(item, actualItem);
string topicUsed = FindTopic(topic, actualItem);

try
{
throw new ArgumentNullException(nameof(item));
var deliveryResult = await this.producer.ProduceAsync(topicUsed, msg);

this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveryResult.Topic, (int)deliveryResult.Partition, (long)deliveryResult.Offset);
}
catch (ProduceException<TKey, TValue> produceException)
{
logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", produceException.DeliveryResult?.Topic, (int)produceException.DeliveryResult?.Partition, (long)produceException.DeliveryResult?.Offset, produceException.Error.Reason, produceException.Error.ToString());
throw;
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error producing into {topic}", topicUsed);
throw;
}
}

var actualItem = (IKafkaEventData)item;
if (actualItem == null)
public void Produce(string topic, object item)
{
ValidateItem(item);
IKafkaEventData actualItem = GetItem(item);
Message<TKey, TValue> msg = BuildMessage(item, actualItem);
string topicUsed = FindTopic(topic, actualItem);

try
{
throw new ArgumentException($"Message value is not of the expected type. Expected: {typeof(KafkaEventData<TKey, TValue>).Name}. Actual: {item.GetType().Name}");
logger.LogInformation("in Produce method");
this.producer.Produce(topicUsed, msg,
deliveryResult => this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveryResult.Topic, (int)deliveryResult.Partition, (long)deliveryResult.Offset));
}
catch (ProduceException<TKey, TValue> produceException)
{
logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", produceException.DeliveryResult?.Topic, (int)produceException.DeliveryResult?.Partition, (long)produceException.DeliveryResult?.Offset, produceException.Error.Reason, produceException.Error.ToString());
throw;
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error producing into {topic}", topicUsed);
throw;
}
}

if (actualItem.Value == null)
public void Flush()
{
this.producer.Flush();
}

private static IKafkaEventData GetItem(object item)
{
IKafkaEventData actualItem = (IKafkaEventData)item;
if (actualItem == null)
{
throw new ArgumentException("Message value was not defined");
throw new ArgumentException($"Message value is not of the expected type. Expected: {typeof(KafkaEventData<TKey, TValue>).Name}. Actual: {item.GetType().Name}");
}

var msg = MessageBuilder.BuildFrom(actualItem);
return actualItem;
}

private static string FindTopic(string topic, IKafkaEventData actualItem)
{
var topicUsed = topic;
if (string.IsNullOrEmpty(topic))
{
Expand All @@ -84,26 +132,29 @@ public async Task ProduceAsync(string topic, object item)
}
}

try
{
var deliveryResult = await this.producer.ProduceAsync(topicUsed, msg);
return topicUsed;
}

this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveryResult.Topic, (int)deliveryResult.Partition, (long)deliveryResult.Offset);
}
catch (ProduceException<TKey, TValue> produceException)
private Message<TKey, TValue> BuildMessage(object item, IKafkaEventData actualItem)
{
if (actualItem.Value == null)
{
logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", produceException.DeliveryResult?.Topic, (int)produceException.DeliveryResult?.Partition, (long)produceException.DeliveryResult?.Offset, produceException.Error.Reason, produceException.Error.ToString());
throw;
throw new ArgumentException("Message value was not defined");
}
catch (Exception ex)
return MessageBuilder.BuildFrom(actualItem);
}

private static void ValidateItem(object item)
{
if (item == null)
{
this.logger.LogError(ex, "Error producing into {topic}", topicUsed);
throw;
throw new ArgumentNullException(nameof(item));
}
}

public void Dispose()
{
this.producer?.Flush();
this.producer?.Dispose();
this.producer = null;
GC.SuppressFinalize(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Microsoft.Azure.WebJobs.Extensions.Kafka.Output;
using Newtonsoft.Json.Linq;

[assembly: InternalsVisibleTo("Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b5fc90e7027f67871e773a8fde8938c81dd402ba65b9201d60593e96c492651e889cc13f1415ebb53fac1131ae0bd333c5ee6021672d9718ea31a8aebd0da0072f25d87dba6fc90ffd598ed4da35e44c398c454307e8e33b8426143daec9f596836f97c8f74750e5975c64e2189f45def46b2a2b1247adc3652bf5c308055da9")]
namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
internal class KafkaProducerAsyncCollector<T> : IAsyncCollector<T>
internal class KafkaProducerAsyncCollector<T> : IAsyncCollector<T>, IDisposable
{
private readonly KafkaProducerEntity entity;
private readonly Guid functionInstanceId;
private List<object> eventList = new List<object>();
private IConverter<T, object> kafkaEventDataConverter = new KafkaEventDataConverter();

public KafkaProducerAsyncCollector(KafkaProducerEntity entity, Guid functionInstanceId)
{
Expand All @@ -34,100 +40,112 @@ public Task AddAsync(T item, CancellationToken cancellationToken)
throw new InvalidOperationException("Cannot produce a null message instance.");
}

object messageToSend = item;

if (item.GetType() == typeof(string))
{
messageToSend = ConvertToKafkaEventData(item);
}
eventList.Add(kafkaEventDataConverter.Convert(item));
return Task.CompletedTask;
}

if (item.GetType() == typeof(byte[]))
public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
List<object> eventObjList;
lock (eventList)
{
messageToSend = new KafkaEventData<T>(item);
eventObjList = new List<object>(eventList);
eventList.Clear();
}

return entity.SendAndCreateEntityIfNotExistsAsync(messageToSend, functionInstanceId, cancellationToken);
await entity.SendAndCreateEntityIfNotExistsAsync(eventObjList, functionInstanceId, cancellationToken);
}

private object ConvertToKafkaEventData(T item)
public async void Dispose()
{
try
{
return BuildKafkaDataEvent(item);
}
catch (Exception)
{
return new KafkaEventData<T>(item);
}
await this.FlushAsync();
}

private object BuildKafkaDataEvent(T item)
private class KafkaEventDataConverter : IConverter<T, object>
{
JObject dataObj = JObject.Parse(item.ToString());
if (dataObj == null)
public object Convert(T item)
{
return new KafkaEventData<T>(item);
if (item.GetType() == typeof(string))
{
return ConvertToKafkaEventData(item);
}
if (item.GetType() == typeof(byte[]))
{
return new KafkaEventData<T>(item);
}
return item;
}

if (dataObj.ContainsKey("Offset") && dataObj.ContainsKey("Partition") && dataObj.ContainsKey("Topic")
&& dataObj.ContainsKey("Timestamp") && dataObj.ContainsKey("Value") && dataObj.ContainsKey("Headers"))
private object ConvertToKafkaEventData(T item)
{
return BuildKafkaEventData(dataObj);
try
{
return BuildKafkaDataEvent(item);
}
catch (Exception)
{
return new KafkaEventData<T>(item);
}
}

return new KafkaEventData<T>(item);
}

private object BuildKafkaEventData(JObject dataObj)
{
if (dataObj["Key"] != null)
private object BuildKafkaDataEvent(T item)
{
return BuildKafkaEventDataForKeyValue(dataObj);
JObject dataObj = JObject.Parse(item.ToString());
if (dataObj == null)
{
return new KafkaEventData<T>(item);
}
if (dataObj.ContainsKey("Offset") && dataObj.ContainsKey("Partition") && dataObj.ContainsKey("Topic")
&& dataObj.ContainsKey("Timestamp") && dataObj.ContainsKey("Value") && dataObj.ContainsKey("Headers"))
{
return BuildKafkaEventData(dataObj);
}
return new KafkaEventData<T>(item);
}
else

private object BuildKafkaEventData(JObject dataObj)
{
if (dataObj["Key"] != null)
{
return BuildKafkaEventDataForKeyValue(dataObj);
}
return BuildKafkaEventDataForValue(dataObj);
}
}

private static object BuildKafkaEventDataForValue(JObject dataObj)
{
KafkaEventData<string> messageToSend = new KafkaEventData<string>((string)dataObj["Value"]);
messageToSend.Timestamp = (DateTime)dataObj["Timestamp"];
messageToSend.Partition = (int)dataObj["Partition"];
JArray headerList = (JArray)dataObj["Headers"];
foreach (JObject header in headerList)
private static object BuildKafkaEventDataForValue(JObject dataObj)
{
messageToSend.Headers.Add((string)header["Key"], Encoding.UTF8.GetBytes((string)header["Value"]));
KafkaEventData<string> messageToSend = new KafkaEventData<string>((string)dataObj["Value"]);
messageToSend.Timestamp = (DateTime)dataObj["Timestamp"];
messageToSend.Partition = (int)dataObj["Partition"];
JArray headerList = (JArray)dataObj["Headers"];
foreach (JObject header in headerList)
{
messageToSend.Headers.Add((string)header["Key"], Encoding.UTF8.GetBytes((string)header["Value"]));
}
return messageToSend;
}
return messageToSend;
}

private static object BuildKafkaEventDataForKeyValue(JObject dataObj)
{
string value = null;
if (dataObj["Value"] != null && dataObj["Value"].Type.ToString().Equals("Object"))
private static object BuildKafkaEventDataForKeyValue(JObject dataObj)
{
value = Newtonsoft.Json.JsonConvert.SerializeObject(dataObj["Value"]);
} else
{
value = (string)dataObj["Value"];
}
KafkaEventData<string, string> messageToSend = new KafkaEventData<string, string>((string)dataObj["Key"], value);
messageToSend.Timestamp = (DateTime)dataObj["Timestamp"];
messageToSend.Partition = (int)dataObj["Partition"];
JArray headerList = (JArray)dataObj["Headers"];
foreach (JObject header in headerList)
{
messageToSend.Headers.Add((string)header["Key"], Encoding.UTF8.GetBytes((string)header["Value"]));
string value = null;
if (dataObj["Value"] != null && dataObj["Value"].Type.ToString().Equals("Object"))
{
value = Newtonsoft.Json.JsonConvert.SerializeObject(dataObj["Value"]);
}
else
{
value = (string)dataObj["Value"];
}
KafkaEventData<string, string> messageToSend = new KafkaEventData<string, string>((string)dataObj["Key"], value);
messageToSend.Timestamp = (DateTime)dataObj["Timestamp"];
messageToSend.Partition = (int)dataObj["Partition"];
JArray headerList = (JArray)dataObj["Headers"];
foreach (JObject header in headerList)
{
messageToSend.Headers.Add((string)header["Key"], Encoding.UTF8.GetBytes((string)header["Value"]));
}
return messageToSend;
}
return messageToSend;
}

public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
// Batching not supported.
return Task.FromResult(0);
}
}
}
Loading