From 1e64f81b214dcd2268addc81492d67f34b27155e Mon Sep 17 00:00:00 2001
From: Henrique Graca <999396+hjgraca@users.noreply.github.com>
Date: Fri, 6 Jun 2025 10:07:55 +0100
Subject: [PATCH 01/35] feat(kafka): add Avro serialization support and
implement Kafka event handling
---
libraries/AWS.Lambda.Powertools.sln | 30 +++
.../AWS.Lambda.Powertools.Kafka.csproj | 20 ++
.../AWS.Lambda.Powertools.Kafka/KafkaEvent.cs | 23 ++
.../PowertoolsKafkaAvroSerializer.cs | 216 ++++++++++++++++++
.../src/AWS.Lambda.Powertools.Kafka/Readme.md | 1 +
libraries/src/Directory.Packages.props | 1 +
.../AWS.Lambda.Powertools.Kafka.Tests.csproj | 57 +++++
.../Powertools/Kafka/Tests/AvroProduct.cs | 86 +++++++
.../Avro/AvroProduct.avsc | 10 +
.../Avro/HandlerTests.cs | 140 ++++++++++++
.../PowertoolsKafkaAvroSerializerTests.cs | 49 ++++
.../Avro/kafka-avro-event.json | 51 +++++
.../Json/kafka-json-event.json | 50 ++++
.../PowertoolsLambdaKafkaSerializerTests.cs | 12 +
.../Protobuf/kafka-protobuf-event.json | 51 +++++
.../Readme.md | 17 ++
16 files changed, 814 insertions(+)
create mode 100644 libraries/src/AWS.Lambda.Powertools.Kafka/AWS.Lambda.Powertools.Kafka.csproj
create mode 100644 libraries/src/AWS.Lambda.Powertools.Kafka/KafkaEvent.cs
create mode 100644 libraries/src/AWS.Lambda.Powertools.Kafka/PowertoolsKafkaAvroSerializer.cs
create mode 100644 libraries/src/AWS.Lambda.Powertools.Kafka/Readme.md
create mode 100644 libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/AWS.Lambda.Powertools.Kafka.Tests.csproj
create mode 100644 libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Avro/AWS/Lambda/Powertools/Kafka/Tests/AvroProduct.cs
create mode 100644 libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Avro/AvroProduct.avsc
create mode 100644 libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Avro/HandlerTests.cs
create mode 100644 libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Avro/PowertoolsKafkaAvroSerializerTests.cs
create mode 100644 libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Avro/kafka-avro-event.json
create mode 100644 libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Json/kafka-json-event.json
create mode 100644 libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/PowertoolsLambdaKafkaSerializerTests.cs
create mode 100644 libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Protobuf/kafka-protobuf-event.json
create mode 100644 libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Readme.md
diff --git a/libraries/AWS.Lambda.Powertools.sln b/libraries/AWS.Lambda.Powertools.sln
index c3056d147..cc18e136a 100644
--- a/libraries/AWS.Lambda.Powertools.sln
+++ b/libraries/AWS.Lambda.Powertools.sln
@@ -113,6 +113,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.Event
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.EventHandler.Resolvers.BedrockAgentFunction.AspNetCore", "src\AWS.Lambda.Powertools.EventHandler.Resolvers.BedrockAgentFunction.AspNetCore\AWS.Lambda.Powertools.EventHandler.Resolvers.BedrockAgentFunction.AspNetCore.csproj", "{8A22F22E-D10A-4897-A89A-DC76C267F6BB}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.Kafka", "src\AWS.Lambda.Powertools.Kafka\AWS.Lambda.Powertools.Kafka.csproj", "{5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.Kafka.Tests", "tests\AWS.Lambda.Powertools.Kafka.Tests\AWS.Lambda.Powertools.Kafka.Tests.csproj", "{FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -618,6 +622,30 @@ Global
{8A22F22E-D10A-4897-A89A-DC76C267F6BB}.Release|x64.Build.0 = Release|Any CPU
{8A22F22E-D10A-4897-A89A-DC76C267F6BB}.Release|x86.ActiveCfg = Release|Any CPU
{8A22F22E-D10A-4897-A89A-DC76C267F6BB}.Release|x86.Build.0 = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|x64.Build.0 = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|x86.Build.0 = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|Any CPU.Build.0 = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|x64.ActiveCfg = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|x64.Build.0 = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|x86.ActiveCfg = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|x86.Build.0 = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|x64.Build.0 = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|x86.Build.0 = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|Any CPU.Build.0 = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|x64.ActiveCfg = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|x64.Build.0 = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|x86.ActiveCfg = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
@@ -671,5 +699,7 @@ Global
{F4B8D5AF-D3CA-4910-A14D-E5BAEF0FD1DE} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
{281F7EB5-ACE5-458F-BC88-46A8899DF3BA} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
{8A22F22E-D10A-4897-A89A-DC76C267F6BB} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645} = {1CFF5568-8486-475F-81F6-06105C437528}
EndGlobalSection
EndGlobal
diff --git a/libraries/src/AWS.Lambda.Powertools.Kafka/AWS.Lambda.Powertools.Kafka.csproj b/libraries/src/AWS.Lambda.Powertools.Kafka/AWS.Lambda.Powertools.Kafka.csproj
new file mode 100644
index 000000000..476f85faf
--- /dev/null
+++ b/libraries/src/AWS.Lambda.Powertools.Kafka/AWS.Lambda.Powertools.Kafka.csproj
@@ -0,0 +1,20 @@
+
+
+
+
+ AWS.Lambda.Powertools.Kafka
+ Powertools for AWS Lambda (.NET) - Kafka consumer package.
+ AWS.Lambda.Powertools.Kafka
+ AWS.Lambda.Powertools.Kafka
+ net8.0
+ false
+ enable
+ enable
+
+
+
+
+
+
+
+
diff --git a/libraries/src/AWS.Lambda.Powertools.Kafka/KafkaEvent.cs b/libraries/src/AWS.Lambda.Powertools.Kafka/KafkaEvent.cs
new file mode 100644
index 000000000..db6fcbd35
--- /dev/null
+++ b/libraries/src/AWS.Lambda.Powertools.Kafka/KafkaEvent.cs
@@ -0,0 +1,23 @@
+using System.Text.Json;
+
+namespace AWS.Lambda.Powertools.Kafka;
+
+public class KafkaEvent
+{
+ public string EventSource { get; set; }
+ public string EventSourceArn { get; set; }
+ public string BootstrapServers { get; set; }
+ public Dictionary>> Records { get; set; } = new();
+}
+
+public class KafkaRecord
+{
+ public string Topic { get; set; }
+ public int Partition { get; set; }
+ public long Offset { get; set; }
+ public long Timestamp { get; set; }
+ public string TimestampType { get; set; }
+ public string Key { get; set; }
+ public T Value { get; set; }
+ public Dictionary Headers { get; set; }
+}
\ No newline at end of file
diff --git a/libraries/src/AWS.Lambda.Powertools.Kafka/PowertoolsKafkaAvroSerializer.cs b/libraries/src/AWS.Lambda.Powertools.Kafka/PowertoolsKafkaAvroSerializer.cs
new file mode 100644
index 000000000..9f3d1a2ab
--- /dev/null
+++ b/libraries/src/AWS.Lambda.Powertools.Kafka/PowertoolsKafkaAvroSerializer.cs
@@ -0,0 +1,216 @@
+using Amazon.Lambda.Core;
+using Avro;
+using Avro.IO;
+using Avro.Specific;
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Reflection;
+using System.Text;
+using System.Text.Json;
+
+namespace AWS.Lambda.Powertools.Kafka;
+
+public class PowertoolsKafkaAvroSerializer : ILambdaSerializer
+{
+ private readonly JsonSerializerOptions _jsonOptions = new()
+ {
+ PropertyNameCaseInsensitive = true
+ };
+
+ public T Deserialize(Stream requestStream)
+ {
+ using var reader = new StreamReader(requestStream);
+ var json = reader.ReadToEnd();
+
+ var targetType = typeof(T);
+
+ if (targetType.IsGenericType && targetType.GetGenericTypeDefinition() == typeof(KafkaEvent<>))
+ {
+ var payloadType = targetType.GetGenericArguments()[0];
+ using var document = JsonDocument.Parse(json);
+ var root = document.RootElement;
+
+ // Create the correctly typed instance
+ var typedEvent = Activator.CreateInstance(targetType);
+
+ // Set basic properties
+ if (root.TryGetProperty("eventSource", out var eventSource))
+ targetType.GetProperty("EventSource").SetValue(typedEvent, eventSource.GetString());
+
+ if (root.TryGetProperty("eventSourceArn", out var eventSourceArn))
+ targetType.GetProperty("EventSourceArn").SetValue(typedEvent, eventSourceArn.GetString());
+
+ if (root.TryGetProperty("bootstrapServers", out var bootstrapServers))
+ targetType.GetProperty("BootstrapServers").SetValue(typedEvent, bootstrapServers.GetString());
+
+ // Get the schema for Avro deserialization
+ Schema schema = GetAvroSchema(payloadType);
+
+ // Create records dictionary with correct generic type
+ var dictType = typeof(Dictionary<,>).MakeGenericType(
+ typeof(string),
+ typeof(List<>).MakeGenericType(typeof(KafkaRecord<>).MakeGenericType(payloadType))
+ );
+ var records = Activator.CreateInstance(dictType);
+ var dictAddMethod = dictType.GetMethod("Add");
+
+ if (root.TryGetProperty("records", out var recordsElement))
+ {
+ foreach (var topicPartition in recordsElement.EnumerateObject())
+ {
+ string topicName = topicPartition.Name;
+
+ // Create list of records with correct generic type
+ var listType = typeof(List<>).MakeGenericType(typeof(KafkaRecord<>).MakeGenericType(payloadType));
+ var recordsList = Activator.CreateInstance(listType);
+ var listAddMethod = listType.GetMethod("Add");
+
+ foreach (var recordElement in topicPartition.Value.EnumerateArray())
+ {
+ // Create record instance of correct type
+ var recordType = typeof(KafkaRecord<>).MakeGenericType(payloadType);
+ var record = Activator.CreateInstance(recordType);
+
+ // Set basic properties
+ SetProperty(recordType, record, "Topic", recordElement, "topic");
+ SetProperty(recordType, record, "Partition", recordElement, "partition");
+ SetProperty(recordType, record, "Offset", recordElement, "offset");
+ SetProperty(recordType, record, "Timestamp", recordElement, "timestamp");
+ SetProperty(recordType, record, "TimestampType", recordElement, "timestampType");
+
+ // Handle key - base64 decode if present
+ if (recordElement.TryGetProperty("key", out var keyElement) &&
+ keyElement.ValueKind == JsonValueKind.String)
+ {
+ string base64Key = keyElement.GetString();
+ recordType.GetProperty("Key").SetValue(record, base64Key);
+
+ // Base64 decode the key
+ if (!string.IsNullOrEmpty(base64Key))
+ {
+ try
+ {
+ byte[] keyBytes = Convert.FromBase64String(base64Key);
+ string decodedKey = Encoding.UTF8.GetString(keyBytes);
+ recordType.GetProperty("Key").SetValue(record, decodedKey);
+ }
+ catch (Exception)
+ {
+ // If decoding fails, leave it as is
+ }
+ }
+ }
+
+ // Handle Avro value
+ if (recordElement.TryGetProperty("value", out var value) &&
+ value.ValueKind == JsonValueKind.String)
+ {
+ string base64Value = value.GetString();
+ // recordType.GetProperty("Value").SetValue(record, base64Value);
+
+ // Deserialize Avro data
+ try
+ {
+ var deserializedValue = DeserializeAvroValue(base64Value, schema);
+ recordType.GetProperty("Value").SetValue(record, deserializedValue);
+ }
+ catch (Exception ex)
+ {
+ throw new Exception($"Failed to deserialize Avro data: {ex.Message}", ex);
+ }
+ }
+
+ if (recordElement.TryGetProperty("headers", out var headersElement) &&
+ headersElement.ValueKind == JsonValueKind.Array)
+ {
+ var decodedHeaders = new Dictionary();
+
+ foreach (var headerObj in headersElement.EnumerateArray())
+ {
+ foreach (var header in headerObj.EnumerateObject())
+ {
+ string headerKey = header.Name;
+ if (header.Value.ValueKind == JsonValueKind.Array)
+ {
+ // Convert integer array to byte array
+ byte[] headerBytes = new byte[header.Value.GetArrayLength()];
+ int i = 0;
+ foreach (var byteVal in header.Value.EnumerateArray())
+ {
+ headerBytes[i++] = (byte)byteVal.GetInt32();
+ }
+
+ // Decode as UTF-8 string
+ string headerValue = Encoding.UTF8.GetString(headerBytes);
+ decodedHeaders[headerKey] = headerValue;
+ }
+ }
+ }
+
+ recordType.GetProperty("Headers").SetValue(record, decodedHeaders);
+ }
+
+ // Add to records list
+ listAddMethod.Invoke(recordsList, new[] { record });
+ }
+
+ // Add topic records to dictionary
+ dictAddMethod.Invoke(records, new[] { topicName, recordsList });
+ }
+ }
+
+ targetType.GetProperty("Records").SetValue(typedEvent, records);
+ return (T)typedEvent;
+ }
+
+ return JsonSerializer.Deserialize(json, _jsonOptions);
+ }
+
+
+ private void SetProperty(Type type, object instance, string propertyName,
+ JsonElement element, string jsonPropertyName)
+ {
+ if (!element.TryGetProperty(jsonPropertyName, out var jsonValue) ||
+ jsonValue.ValueKind == JsonValueKind.Null)
+ return;
+
+ var property = type.GetProperty(propertyName);
+ var propertyType = property.PropertyType;
+
+ object value;
+ if (propertyType == typeof(int)) value = jsonValue.GetInt32();
+ else if (propertyType == typeof(long)) value = jsonValue.GetInt64();
+ else if (propertyType == typeof(double)) value = jsonValue.GetDouble();
+ else if (propertyType == typeof(string)) value = jsonValue.GetString();
+ else return;
+
+ property.SetValue(instance, value);
+ }
+
+ private Schema GetAvroSchema(Type payloadType)
+ {
+ var schemaField = payloadType.GetField("_SCHEMA",
+ BindingFlags.Public | BindingFlags.Static);
+
+ if (schemaField == null)
+ throw new InvalidOperationException($"No Avro schema found for type {payloadType.Name}");
+
+ return schemaField.GetValue(null) as Schema;
+ }
+
+ private object DeserializeAvroValue(string base64Value, Schema schema)
+ {
+ byte[] avroBytes = Convert.FromBase64String(base64Value);
+ using var stream = new MemoryStream(avroBytes);
+ var decoder = new BinaryDecoder(stream);
+ var reader = new SpecificDatumReader