diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml
index ba06d4429..206a028f8 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -39,6 +39,9 @@ jobs:
with:
languages: ${{ matrix.language }}
+ - name: Install global tools
+ run: dotnet tool install --global Apache.Avro.Tools
+
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
diff --git a/.github/workflows/examples-tests.yml b/.github/workflows/examples-tests.yml
index 83522084d..77b322e18 100644
--- a/.github/workflows/examples-tests.yml
+++ b/.github/workflows/examples-tests.yml
@@ -33,6 +33,9 @@ jobs:
- name: Install dependencies
run: dotnet restore
+ - name: Install global tools
+ run: dotnet tool install --global Apache.Avro.Tools
+
- name: Build
run: dotnet build --configuration Release --no-restore /tl
diff --git a/examples/Kafka/Avro/src/Avro.csproj b/examples/Kafka/Avro/src/Avro.csproj
new file mode 100644
index 000000000..05314f2fb
--- /dev/null
+++ b/examples/Kafka/Avro/src/Avro.csproj
@@ -0,0 +1,35 @@
+
+
+ Exe
+ net8.0
+ enable
+ enable
+ true
+ Lambda
+
+ true
+
+ true
+
+ Avro.Example
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ PreserveNewest
+
+
+
\ No newline at end of file
diff --git a/examples/Kafka/Avro/src/CustomerProfile.avsc b/examples/Kafka/Avro/src/CustomerProfile.avsc
new file mode 100644
index 000000000..bf8cc090c
--- /dev/null
+++ b/examples/Kafka/Avro/src/CustomerProfile.avsc
@@ -0,0 +1,46 @@
+{
+ "type": "record",
+ "name": "CustomerProfile",
+ "namespace": "com.example",
+ "fields": [
+ {"name": "user_id", "type": "string"},
+ {"name": "full_name", "type": "string"},
+ {"name": "email", "type": {
+ "type": "record",
+ "name": "EmailAddress",
+ "fields": [
+ {"name": "address", "type": "string"},
+ {"name": "verified", "type": "boolean"},
+ {"name": "primary", "type": "boolean"}
+ ]
+ }},
+ {"name": "age", "type": "int"},
+ {"name": "address", "type": {
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "country", "type": "string"},
+ {"name": "zip_code", "type": "string"}
+ ]
+ }},
+ {"name": "phone_numbers", "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "PhoneNumber",
+ "fields": [
+ {"name": "number", "type": "string"},
+ {"name": "type", "type": {"type": "enum", "name": "PhoneType", "symbols": ["HOME", "WORK", "MOBILE"]}}
+ ]
+ }
+ }},
+ {"name": "preferences", "type": {
+ "type": "map",
+ "values": "string"
+ }},
+ {"name": "account_status", "type": {"type": "enum", "name": "AccountStatus", "symbols": ["ACTIVE", "INACTIVE", "SUSPENDED"]}}
+ ]
+}
\ No newline at end of file
diff --git a/examples/Kafka/Avro/src/Function.cs b/examples/Kafka/Avro/src/Function.cs
new file mode 100644
index 000000000..6ca9ebdb5
--- /dev/null
+++ b/examples/Kafka/Avro/src/Function.cs
@@ -0,0 +1,21 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.RuntimeSupport;
+using AWS.Lambda.Powertools.Kafka;
+using AWS.Lambda.Powertools.Kafka.Avro;
+using AWS.Lambda.Powertools.Logging;
+using com.example;
+
+string Handler(ConsumerRecords records, ILambdaContext context)
+{
+ foreach (var record in records)
+ {
+ Logger.LogInformation("Record Value: {@record}", record.Value);
+ }
+
+ return "Processed " + records.Count() + " records";
+}
+
+await LambdaBootstrapBuilder.Create((Func, ILambdaContext, string>?)Handler,
+ new PowertoolsKafkaAvroSerializer()) // Use PowertoolsKafkaAvroSerializer for Avro serialization
+ .Build()
+ .RunAsync();
\ No newline at end of file
diff --git a/examples/Kafka/Avro/src/Generated/com/example/AccountStatus.cs b/examples/Kafka/Avro/src/Generated/com/example/AccountStatus.cs
new file mode 100644
index 000000000..c7809f518
--- /dev/null
+++ b/examples/Kafka/Avro/src/Generated/com/example/AccountStatus.cs
@@ -0,0 +1,23 @@
+// ------------------------------------------------------------------------------
+//
+// Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
+// Changes to this file may cause incorrect behavior and will be lost if code
+// is regenerated
+//
+// ------------------------------------------------------------------------------
+namespace com.example
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text;
+ using global::Avro;
+ using global::Avro.Specific;
+
+ [global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
+ public enum AccountStatus
+ {
+ ACTIVE,
+ INACTIVE,
+ SUSPENDED,
+ }
+}
diff --git a/examples/Kafka/Avro/src/Generated/com/example/Address.cs b/examples/Kafka/Avro/src/Generated/com/example/Address.cs
new file mode 100644
index 000000000..e2053e0f2
--- /dev/null
+++ b/examples/Kafka/Avro/src/Generated/com/example/Address.cs
@@ -0,0 +1,115 @@
+// ------------------------------------------------------------------------------
+//
+// Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
+// Changes to this file may cause incorrect behavior and will be lost if code
+// is regenerated
+//
+// ------------------------------------------------------------------------------
+namespace com.example
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text;
+ using global::Avro;
+ using global::Avro.Specific;
+
+ [global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
+ public partial class Address : global::Avro.Specific.ISpecificRecord
+ {
+ public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"Address\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"st" +
+ "reet\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"state\",\"type\":\"s" +
+ "tring\"},{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"zip_code\",\"type\":\"string\"}]}" +
+ "");
+ private string _street;
+ private string _city;
+ private string _state;
+ private string _country;
+ private string _zip_code;
+ public virtual global::Avro.Schema Schema
+ {
+ get
+ {
+ return Address._SCHEMA;
+ }
+ }
+ public string street
+ {
+ get
+ {
+ return this._street;
+ }
+ set
+ {
+ this._street = value;
+ }
+ }
+ public string city
+ {
+ get
+ {
+ return this._city;
+ }
+ set
+ {
+ this._city = value;
+ }
+ }
+ public string state
+ {
+ get
+ {
+ return this._state;
+ }
+ set
+ {
+ this._state = value;
+ }
+ }
+ public string country
+ {
+ get
+ {
+ return this._country;
+ }
+ set
+ {
+ this._country = value;
+ }
+ }
+ public string zip_code
+ {
+ get
+ {
+ return this._zip_code;
+ }
+ set
+ {
+ this._zip_code = value;
+ }
+ }
+ public virtual object Get(int fieldPos)
+ {
+ switch (fieldPos)
+ {
+ case 0: return this.street;
+ case 1: return this.city;
+ case 2: return this.state;
+ case 3: return this.country;
+ case 4: return this.zip_code;
+ default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
+ };
+ }
+ public virtual void Put(int fieldPos, object fieldValue)
+ {
+ switch (fieldPos)
+ {
+ case 0: this.street = (System.String)fieldValue; break;
+ case 1: this.city = (System.String)fieldValue; break;
+ case 2: this.state = (System.String)fieldValue; break;
+ case 3: this.country = (System.String)fieldValue; break;
+ case 4: this.zip_code = (System.String)fieldValue; break;
+ default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
+ };
+ }
+ }
+}
diff --git a/examples/Kafka/Avro/src/Generated/com/example/CustomerProfile.cs b/examples/Kafka/Avro/src/Generated/com/example/CustomerProfile.cs
new file mode 100644
index 000000000..15d62095d
--- /dev/null
+++ b/examples/Kafka/Avro/src/Generated/com/example/CustomerProfile.cs
@@ -0,0 +1,154 @@
+// ------------------------------------------------------------------------------
+//
+// Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
+// Changes to this file may cause incorrect behavior and will be lost if code
+// is regenerated
+//
+// ------------------------------------------------------------------------------
+namespace com.example
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text;
+ using global::Avro;
+ using global::Avro.Specific;
+
+ [global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
+ public partial class CustomerProfile : global::Avro.Specific.ISpecificRecord
+ {
+ public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse(@"{""type"":""record"",""name"":""CustomerProfile"",""namespace"":""com.example"",""fields"":[{""name"":""user_id"",""type"":""string""},{""name"":""full_name"",""type"":""string""},{""name"":""email"",""type"":{""type"":""record"",""name"":""EmailAddress"",""namespace"":""com.example"",""fields"":[{""name"":""address"",""type"":""string""},{""name"":""verified"",""type"":""boolean""},{""name"":""primary"",""type"":""boolean""}]}},{""name"":""age"",""type"":""int""},{""name"":""address"",""type"":{""type"":""record"",""name"":""Address"",""namespace"":""com.example"",""fields"":[{""name"":""street"",""type"":""string""},{""name"":""city"",""type"":""string""},{""name"":""state"",""type"":""string""},{""name"":""country"",""type"":""string""},{""name"":""zip_code"",""type"":""string""}]}},{""name"":""phone_numbers"",""type"":{""type"":""array"",""items"":{""type"":""record"",""name"":""PhoneNumber"",""namespace"":""com.example"",""fields"":[{""name"":""number"",""type"":""string""},{""name"":""type"",""type"":{""type"":""enum"",""name"":""PhoneType"",""namespace"":""com.example"",""symbols"":[""HOME"",""WORK"",""MOBILE""]}}]}}},{""name"":""preferences"",""type"":{""type"":""map"",""values"":""string""}},{""name"":""account_status"",""type"":{""type"":""enum"",""name"":""AccountStatus"",""namespace"":""com.example"",""symbols"":[""ACTIVE"",""INACTIVE"",""SUSPENDED""]}}]}");
+ private string _user_id;
+ private string _full_name;
+ private com.example.EmailAddress _email;
+ private int _age;
+ private com.example.Address _address;
+ private IList _phone_numbers;
+ private IDictionary _preferences;
+ private com.example.AccountStatus _account_status;
+ public virtual global::Avro.Schema Schema
+ {
+ get
+ {
+ return CustomerProfile._SCHEMA;
+ }
+ }
+ public string user_id
+ {
+ get
+ {
+ return this._user_id;
+ }
+ set
+ {
+ this._user_id = value;
+ }
+ }
+ public string full_name
+ {
+ get
+ {
+ return this._full_name;
+ }
+ set
+ {
+ this._full_name = value;
+ }
+ }
+ public com.example.EmailAddress email
+ {
+ get
+ {
+ return this._email;
+ }
+ set
+ {
+ this._email = value;
+ }
+ }
+ public int age
+ {
+ get
+ {
+ return this._age;
+ }
+ set
+ {
+ this._age = value;
+ }
+ }
+ public com.example.Address address
+ {
+ get
+ {
+ return this._address;
+ }
+ set
+ {
+ this._address = value;
+ }
+ }
+ public IList phone_numbers
+ {
+ get
+ {
+ return this._phone_numbers;
+ }
+ set
+ {
+ this._phone_numbers = value;
+ }
+ }
+ public IDictionary preferences
+ {
+ get
+ {
+ return this._preferences;
+ }
+ set
+ {
+ this._preferences = value;
+ }
+ }
+ public com.example.AccountStatus account_status
+ {
+ get
+ {
+ return this._account_status;
+ }
+ set
+ {
+ this._account_status = value;
+ }
+ }
+ public virtual object Get(int fieldPos)
+ {
+ switch (fieldPos)
+ {
+ case 0: return this.user_id;
+ case 1: return this.full_name;
+ case 2: return this.email;
+ case 3: return this.age;
+ case 4: return this.address;
+ case 5: return this.phone_numbers;
+ case 6: return this.preferences;
+ case 7: return this.account_status;
+ default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
+ };
+ }
+ public virtual void Put(int fieldPos, object fieldValue)
+ {
+ switch (fieldPos)
+ {
+ case 0: this.user_id = (System.String)fieldValue; break;
+ case 1: this.full_name = (System.String)fieldValue; break;
+ case 2: this.email = (com.example.EmailAddress)fieldValue; break;
+ case 3: this.age = (System.Int32)fieldValue; break;
+ case 4: this.address = (com.example.Address)fieldValue; break;
+ case 5: this.phone_numbers = (IList)fieldValue; break;
+ case 6: this.preferences = (IDictionary)fieldValue; break;
+ case 7: this.account_status = (com.example.AccountStatus)fieldValue; break;
+ default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
+ };
+ }
+ }
+}
diff --git a/examples/Kafka/Avro/src/Generated/com/example/EmailAddress.cs b/examples/Kafka/Avro/src/Generated/com/example/EmailAddress.cs
new file mode 100644
index 000000000..4a25a6e0b
--- /dev/null
+++ b/examples/Kafka/Avro/src/Generated/com/example/EmailAddress.cs
@@ -0,0 +1,86 @@
+// ------------------------------------------------------------------------------
+//
+// Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
+// Changes to this file may cause incorrect behavior and will be lost if code
+// is regenerated
+//
+// ------------------------------------------------------------------------------
+namespace com.example
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text;
+ using global::Avro;
+ using global::Avro.Specific;
+
+ [global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
+ public partial class EmailAddress : global::Avro.Specific.ISpecificRecord
+ {
+ public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"EmailAddress\",\"namespace\":\"com.example\",\"fields\":[{\"name" +
+ "\":\"address\",\"type\":\"string\"},{\"name\":\"verified\",\"type\":\"boolean\"},{\"name\":\"prima" +
+ "ry\",\"type\":\"boolean\"}]}");
+ private string _address;
+ private bool _verified;
+ private bool _primary;
+ public virtual global::Avro.Schema Schema
+ {
+ get
+ {
+ return EmailAddress._SCHEMA;
+ }
+ }
+ public string address
+ {
+ get
+ {
+ return this._address;
+ }
+ set
+ {
+ this._address = value;
+ }
+ }
+ public bool verified
+ {
+ get
+ {
+ return this._verified;
+ }
+ set
+ {
+ this._verified = value;
+ }
+ }
+ public bool primary
+ {
+ get
+ {
+ return this._primary;
+ }
+ set
+ {
+ this._primary = value;
+ }
+ }
+ public virtual object Get(int fieldPos)
+ {
+ switch (fieldPos)
+ {
+ case 0: return this.address;
+ case 1: return this.verified;
+ case 2: return this.primary;
+ default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
+ };
+ }
+ public virtual void Put(int fieldPos, object fieldValue)
+ {
+ switch (fieldPos)
+ {
+ case 0: this.address = (System.String)fieldValue; break;
+ case 1: this.verified = (System.Boolean)fieldValue; break;
+ case 2: this.primary = (System.Boolean)fieldValue; break;
+ default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
+ };
+ }
+ }
+}
diff --git a/examples/Kafka/Avro/src/Generated/com/example/PhoneNumber.cs b/examples/Kafka/Avro/src/Generated/com/example/PhoneNumber.cs
new file mode 100644
index 000000000..ea3d2b8ed
--- /dev/null
+++ b/examples/Kafka/Avro/src/Generated/com/example/PhoneNumber.cs
@@ -0,0 +1,72 @@
+// ------------------------------------------------------------------------------
+//
+// Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
+// Changes to this file may cause incorrect behavior and will be lost if code
+// is regenerated
+//
+// ------------------------------------------------------------------------------
+namespace com.example
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text;
+ using global::Avro;
+ using global::Avro.Specific;
+
+ [global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
+ public partial class PhoneNumber : global::Avro.Specific.ISpecificRecord
+ {
+ public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"PhoneNumber\",\"namespace\":\"com.example\",\"fields\":[{\"name\"" +
+ ":\"number\",\"type\":\"string\"},{\"name\":\"type\",\"type\":{\"type\":\"enum\",\"name\":\"PhoneTyp" +
+ "e\",\"namespace\":\"com.example\",\"symbols\":[\"HOME\",\"WORK\",\"MOBILE\"]}}]}");
+ private string _number;
+ private com.example.PhoneType _type;
+ public virtual global::Avro.Schema Schema
+ {
+ get
+ {
+ return PhoneNumber._SCHEMA;
+ }
+ }
+ public string number
+ {
+ get
+ {
+ return this._number;
+ }
+ set
+ {
+ this._number = value;
+ }
+ }
+ public com.example.PhoneType type
+ {
+ get
+ {
+ return this._type;
+ }
+ set
+ {
+ this._type = value;
+ }
+ }
+ public virtual object Get(int fieldPos)
+ {
+ switch (fieldPos)
+ {
+ case 0: return this.number;
+ case 1: return this.type;
+ default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
+ };
+ }
+ public virtual void Put(int fieldPos, object fieldValue)
+ {
+ switch (fieldPos)
+ {
+ case 0: this.number = (System.String)fieldValue; break;
+ case 1: this.type = (com.example.PhoneType)fieldValue; break;
+ default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
+ };
+ }
+ }
+}
diff --git a/examples/Kafka/Avro/src/Generated/com/example/PhoneType.cs b/examples/Kafka/Avro/src/Generated/com/example/PhoneType.cs
new file mode 100644
index 000000000..f592d8692
--- /dev/null
+++ b/examples/Kafka/Avro/src/Generated/com/example/PhoneType.cs
@@ -0,0 +1,23 @@
+// ------------------------------------------------------------------------------
+//
+// Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
+// Changes to this file may cause incorrect behavior and will be lost if code
+// is regenerated
+//
+// ------------------------------------------------------------------------------
+namespace com.example
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text;
+ using global::Avro;
+ using global::Avro.Specific;
+
+ [global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
+ public enum PhoneType
+ {
+ HOME,
+ WORK,
+ MOBILE,
+ }
+}
diff --git a/examples/Kafka/Avro/src/Readme.md b/examples/Kafka/Avro/src/Readme.md
new file mode 100644
index 000000000..23e64e8e2
--- /dev/null
+++ b/examples/Kafka/Avro/src/Readme.md
@@ -0,0 +1,131 @@
+# AWS Powertools for AWS Lambda .NET - Kafka Avro Example
+
+This project demonstrates how to use AWS Lambda Powertools for .NET with Amazon MSK (Managed Streaming for Kafka) to process events from Kafka topics.
+
+## Overview
+
+This example showcases a Lambda functions that consume messages from Kafka topics with Avro serialization format.
+
+It uses the `AWS.Lambda.Powertools.Kafka.Avro` NuGet package to easily deserialize and process Kafka records.
+
+## Project Structure
+
+```bash
+examples/Kafka/Avro/src/
+├── Function.cs # Entry point for the Lambda function
+├── aws-lambda-tools-defaults.json # Default argument settings for AWS Lambda deployment
+├── template.yaml # AWS SAM template for deploying the function
+├── CustomerProfile.avsc # Avro schema definition file for the data structure used in the Kafka messages
+└── kafka-avro-event.json # Sample Avro event to test the function
+```
+
+## Prerequisites
+
+- [Dotnet](https://dotnet.microsoft.com/en-us/download/dotnet) (dotnet8 or later)
+- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html)
+- [AWS CLI](https://aws.amazon.com/cli/)
+- An AWS account with appropriate permissions
+- [Amazon MSK](https://aws.amazon.com/msk/) cluster set up with a topic to consume messages from
+- [AWS.Lambda.Powertools.Kafka.Avro](https://www.nuget.org/packages/AWS.Lambda.Powertools.Kafka.Avro/) NuGet package installed in your project
+- [Avro Tools](https://www.nuget.org/packages/Apache.Avro.Tools/) codegen tool to generate C# classes from the Avro schema
+
+## Installation
+
+1. Clone the repository:
+
+ ```bash
+ git clone https://github.com/aws-powertools/powertools-lambda-dotnet.git
+ ```
+
+2. Navigate to the project directory:
+
+ ```bash
+ cd powertools-lambda-dotnet/examples/Kafka/Avro/src
+ ```
+
+3. Build the project:
+
+ ```bash
+ dotnet build
+ ```
+4. Install the Avro Tools globally to generate C# classes from the Avro schema:
+
+ ```bash
+ dotnet tool install --global Apache.Avro.Tools
+ ```
+
+## Deployment
+
+Deploy the application using the AWS SAM CLI:
+
+```bash
+sam build
+sam deploy --guided
+```
+
+Follow the prompts to configure your deployment.
+
+## Avro Format
+Avro is a binary serialization format that provides a compact and efficient way to serialize structured data. It uses schemas to define the structure of the data, which allows for robust data evolution.
+
+In this example we provide a schema called `CustomerProfile.avsc`. The schema is used to serialize and deserialize the data in the Kafka messages.
+
+The classes are generated from the .cs file using the Avro Tools command:
+
+```xml
+
+
+
+```
+
+## Usage Examples
+
+Once deployed, you can test the Lambda function by sending a sample Avro event to the configured Kafka topic.
+You can use the `kafka-avro-event.json` file as a sample event to test the function.
+
+### Testing
+
+You can test the function locally using the AWS SAM CLI (Requires Docker to be installed):
+
+```bash
+sam local invoke AvroDeserializationFunction --event kafka-avro-event.json
+```
+
+This command simulates an invocation of the Lambda function with the provided event data.
+
+## How It Works
+
+1. **Event Source**: Configure your Lambda functions with an MSK or self-managed Kafka cluster as an event source.
+2. **Deserializing Records**: Powertools handles deserializing the records based on the specified format.
+3. **Processing**: Each record is processed within the handler function.
+
+## Event Deserialization
+
+Pass the `PowertoolsKafkaAvroSerializer` to the `LambdaBootstrapBuilder.Create()` method to enable Avro deserialization of Kafka records:
+
+```csharp
+await LambdaBootstrapBuilder.Create((Func, ILambdaContext, string>?)Handler,
+ new PowertoolsKafkaAvroSerializer()) // Use PowertoolsKafkaAvroSerializer for Avro serialization
+ .Build()
+ .RunAsync();
+ ```
+
+## Configuration
+
+The SAM template (`template.yaml`) defines three Lambda function:
+
+- **AvroDeserializationFunction**: Handles Avro-formatted Kafka messages
+
+## Customization
+
+To customize the examples:
+
+1. Modify the schema definitions to match your data structures
+2. Update the handler logic to process the records according to your requirements
+
+## Resources
+
+- [AWS Lambda Powertools for .NET Documentation](https://docs.powertools.aws.dev/lambda/dotnet/)
+- [Amazon MSK Documentation](https://docs.aws.amazon.com/msk/)
+- [AWS Lambda Developer Guide](https://docs.aws.amazon.com/lambda/)
+- [Apache Avro Documentation](https://avro.apache.org/docs/)
\ No newline at end of file
diff --git a/examples/Kafka/Avro/src/aws-lambda-tools-defaults.json b/examples/Kafka/Avro/src/aws-lambda-tools-defaults.json
new file mode 100644
index 000000000..cd93437eb
--- /dev/null
+++ b/examples/Kafka/Avro/src/aws-lambda-tools-defaults.json
@@ -0,0 +1,15 @@
+{
+ "Information": [
+ "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.",
+ "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.",
+ "dotnet lambda help",
+ "All the command line options for the Lambda command can be specified in this file."
+ ],
+ "profile": "",
+ "region": "",
+ "configuration": "Release",
+ "function-runtime": "dotnet8",
+ "function-memory-size": 512,
+ "function-timeout": 30,
+ "function-handler": "Avro.Example"
+}
\ No newline at end of file
diff --git a/examples/Kafka/Avro/src/kafka-avro-event.json b/examples/Kafka/Avro/src/kafka-avro-event.json
new file mode 100644
index 000000000..6f5e045e3
--- /dev/null
+++ b/examples/Kafka/Avro/src/kafka-avro-event.json
@@ -0,0 +1,23 @@
+{
+ "eventSource": "aws:kafka",
+ "eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/CustomerCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
+ "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
+ "records": {
+ "customer-topic-0": [
+ {
+ "topic": "customer-topic",
+ "partition": 0,
+ "offset": 15,
+ "timestamp": 1545084650987,
+ "timestampType": "CREATE_TIME",
+ "key": "dXNlcl85NzU0",
+ "value": "EnVzZXJfOTc1NBxVc2VyIHVzZXJfOTc1NCh1c2VyXzk3NTRAaWNsb3VkLmNvbQABahg5MzQwIE1haW4gU3QQU2FuIEpvc2UEQ0EGVVNBCjM5NTk2AhgyNDQtNDA3LTg4NzECAAYQdGltZXpvbmUOZW5hYmxlZBBsYW5ndWFnZRBkaXNhYmxlZBpub3RpZmljYXRpb25zCGRhcmsABA==",
+ "headers": [
+ {
+ "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
+ }
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/examples/Kafka/Avro/src/template.yaml b/examples/Kafka/Avro/src/template.yaml
new file mode 100644
index 000000000..a08325be2
--- /dev/null
+++ b/examples/Kafka/Avro/src/template.yaml
@@ -0,0 +1,27 @@
+AWSTemplateFormatVersion: '2010-09-09'
+Transform: AWS::Serverless-2016-10-31
+Description: >
+ kafka
+
+ Sample SAM Template for kafka
+
+# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
+Globals:
+ Function:
+ Timeout: 15
+ MemorySize: 512
+ Runtime: dotnet8
+
+Resources:
+ AvroDeserializationFunction:
+ Type: AWS::Serverless::Function
+ Properties:
+ Handler: Avro.Example
+ Architectures:
+ - x86_64
+ Tracing: Active
+ Environment: # Powertools env vars: https://awslabs.github.io/aws-lambda-powertools-python/#environment-variables
+ Variables:
+ POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
+ POWERTOOLS_LOG_LEVEL: Info
+ POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase (Default)
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/Function.cs b/examples/Kafka/Json/src/Function.cs
new file mode 100644
index 000000000..d7d96bfca
--- /dev/null
+++ b/examples/Kafka/Json/src/Function.cs
@@ -0,0 +1,21 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.RuntimeSupport;
+using AWS.Lambda.Powertools.Kafka;
+using AWS.Lambda.Powertools.Kafka.Json;
+using AWS.Lambda.Powertools.Logging;
+using Json.Models;
+
+string Handler(ConsumerRecords records, ILambdaContext context)
+{
+ foreach (var record in records)
+ {
+ Logger.LogInformation("Record Value: {@record}", record.Value);
+ }
+
+ return "Processed " + records.Count() + " records";
+}
+
+await LambdaBootstrapBuilder.Create((Func, ILambdaContext, string>?)Handler,
+ new PowertoolsKafkaJsonSerializer()) // Use PowertoolsKafkaJsonSerializer for Json serialization
+ .Build()
+ .RunAsync();
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/Json.csproj b/examples/Kafka/Json/src/Json.csproj
new file mode 100644
index 000000000..aba6cde89
--- /dev/null
+++ b/examples/Kafka/Json/src/Json.csproj
@@ -0,0 +1,30 @@
+
+
+ Exe
+ net8.0
+ enable
+ enable
+ true
+ Lambda
+
+ true
+
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+ PreserveNewest
+
+
+
+
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/Models/Address.cs b/examples/Kafka/Json/src/Models/Address.cs
new file mode 100644
index 000000000..a011b3cee
--- /dev/null
+++ b/examples/Kafka/Json/src/Models/Address.cs
@@ -0,0 +1,16 @@
+using System.Text.Json.Serialization;
+
+namespace Json.Models;
+
+public partial class Address
+{
+ [JsonPropertyName("street")] public string Street { get; set; }
+
+ [JsonPropertyName("city")] public string City { get; set; }
+
+ [JsonPropertyName("state")] public string State { get; set; }
+
+ [JsonPropertyName("country")] public string Country { get; set; }
+
+ [JsonPropertyName("zip_code")] public string ZipCode { get; set; }
+}
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/Models/CustomerProfile.cs b/examples/Kafka/Json/src/Models/CustomerProfile.cs
new file mode 100644
index 000000000..1e7ab62b6
--- /dev/null
+++ b/examples/Kafka/Json/src/Models/CustomerProfile.cs
@@ -0,0 +1,22 @@
+using System.Text.Json.Serialization;
+
+namespace Json.Models;
+
+public partial class CustomerProfile
+{
+ [JsonPropertyName("user_id")] public string UserId { get; set; }
+
+ [JsonPropertyName("full_name")] public string FullName { get; set; }
+
+ [JsonPropertyName("email")] public Email Email { get; set; }
+
+ [JsonPropertyName("age")] public long Age { get; set; }
+
+ [JsonPropertyName("address")] public Address Address { get; set; }
+
+ [JsonPropertyName("phone_numbers")] public List PhoneNumbers { get; set; }
+
+ [JsonPropertyName("preferences")] public Preferences Preferences { get; set; }
+
+ [JsonPropertyName("account_status")] public string AccountStatus { get; set; }
+}
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/Models/Email.cs b/examples/Kafka/Json/src/Models/Email.cs
new file mode 100644
index 000000000..045118baf
--- /dev/null
+++ b/examples/Kafka/Json/src/Models/Email.cs
@@ -0,0 +1,12 @@
+using System.Text.Json.Serialization;
+
+namespace Json.Models;
+
+public partial class Email
+{
+ [JsonPropertyName("address")] public string Address { get; set; }
+
+ [JsonPropertyName("verified")] public bool Verified { get; set; }
+
+ [JsonPropertyName("primary")] public bool Primary { get; set; }
+}
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/Models/PhoneNumber.cs b/examples/Kafka/Json/src/Models/PhoneNumber.cs
new file mode 100644
index 000000000..7681265d1
--- /dev/null
+++ b/examples/Kafka/Json/src/Models/PhoneNumber.cs
@@ -0,0 +1,10 @@
+using System.Text.Json.Serialization;
+
+namespace Json.Models;
+
+public partial class PhoneNumber
+{
+ [JsonPropertyName("number")] public string Number { get; set; }
+
+ [JsonPropertyName("type")] public string Type { get; set; }
+}
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/Models/Preferences.cs b/examples/Kafka/Json/src/Models/Preferences.cs
new file mode 100644
index 000000000..5dd84aa99
--- /dev/null
+++ b/examples/Kafka/Json/src/Models/Preferences.cs
@@ -0,0 +1,12 @@
+using System.Text.Json.Serialization;
+
+namespace Json.Models;
+
+public partial class Preferences
+{
+ [JsonPropertyName("language")] public string Language { get; set; }
+
+ [JsonPropertyName("notifications")] public string Notifications { get; set; }
+
+ [JsonPropertyName("timezone")] public string Timezone { get; set; }
+}
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/Readme.md b/examples/Kafka/Json/src/Readme.md
new file mode 100644
index 000000000..4315f2da7
--- /dev/null
+++ b/examples/Kafka/Json/src/Readme.md
@@ -0,0 +1,111 @@
+# AWS Powertools for AWS Lambda .NET - Kafka Json Example
+
+This project demonstrates how to use AWS Lambda Powertools for .NET with Amazon MSK (Managed Streaming for Kafka) to process events from Kafka topics.
+
+## Overview
+
+This example showcases a Lambda functions that consume messages from Kafka topics with Json serialization format.
+
+It uses the `AWS.Lambda.Powertools.Kafka.Json` NuGet package to easily deserialize and process Kafka records.
+
+## Project Structure
+
+```bash
+examples/Kafka/Json/src/
+├── Function.cs # Entry point for the Lambda function
+├── aws-lambda-tools-defaults.json # Default argument settings for AWS Lambda deployment
+├── template.yaml # AWS SAM template for deploying the function
+└── kafka-json-event.json # Sample Json event to test the function
+```
+
+## Prerequisites
+
+- [Dotnet](https://dotnet.microsoft.com/en-us/download/dotnet) (dotnet8 or later)
+- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html)
+- [AWS CLI](https://aws.amazon.com/cli/)
+- An AWS account with appropriate permissions
+- [Amazon MSK](https://aws.amazon.com/msk/) cluster set up with a topic to consume messages from
+- [AWS.Lambda.Powertools.Kafka.Json](https://www.nuget.org/packages/AWS.Lambda.Powertools.Kafka.Json/) NuGet package installed in your project
+
+## Installation
+
+1. Clone the repository:
+
+ ```bash
+ git clone https://github.com/aws-powertools/powertools-lambda-dotnet.git
+ ```
+
+2. Navigate to the project directory:
+
+ ```bash
+ cd powertools-lambda-dotnet/examples/Kafka/Json/src
+ ```
+
+3. Build the project:
+
+ ```bash
+ dotnet build
+ ```
+
+## Deployment
+
+Deploy the application using the AWS SAM CLI:
+
+```bash
+sam build
+sam deploy --guided
+```
+
+Follow the prompts to configure your deployment.
+
+
+## Usage Examples
+
+Once deployed, you can test the Lambda function by sending a sample Json event to the configured Kafka topic.
+You can use the `kafka-json-event.json` file as a sample event to test the function.
+
+### Testing
+
+You can test the function locally using the AWS SAM CLI (Requires Docker to be installed):
+
+```bash
+sam local invoke JsonDeserializationFunction --event kafka-json-event.json
+```
+
+This command simulates an invocation of the Lambda function with the provided event data.
+
+## How It Works
+
+1. **Event Source**: Configure your Lambda functions with an MSK or self-managed Kafka cluster as an event source.
+2. **Deserializing Records**: Powertools handles deserializing the records based on the specified format.
+3. **Processing**: Each record is processed within the handler function.
+
+## Event Deserialization
+
+Pass the `PowertoolsKafkaJsonSerializer` to the `LambdaBootstrapBuilder.Create()` method to enable JSON deserialization of Kafka records:
+
+```csharp
+await LambdaBootstrapBuilder.Create((Func, ILambdaContext, string>?)Handler,
+ new PowertoolsKafkaJsonSerializer()) // Use PowertoolsKafkaAvroSerializer for Avro serialization
+ .Build()
+ .RunAsync();
+ ```
+
+## Configuration
+
+The SAM template (`template.yaml`) defines three Lambda function:
+
+- **JsonDeserializationFunction**: Handles json-formatted Kafka messages
+
+## Customization
+
+To customize the examples:
+
+1. Modify the schema definitions to match your data structures
+2. Update the handler logic to process the records according to your requirements
+
+## Resources
+
+- [AWS Lambda Powertools for .NET Documentation](https://docs.powertools.aws.dev/lambda/dotnet/)
+- [Amazon MSK Documentation](https://docs.aws.amazon.com/msk/)
+- [AWS Lambda Developer Guide](https://docs.aws.amazon.com/lambda/)
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/aws-lambda-tools-defaults.json b/examples/Kafka/Json/src/aws-lambda-tools-defaults.json
new file mode 100644
index 000000000..fb3240903
--- /dev/null
+++ b/examples/Kafka/Json/src/aws-lambda-tools-defaults.json
@@ -0,0 +1,15 @@
+{
+ "Information": [
+ "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.",
+ "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.",
+ "dotnet lambda help",
+ "All the command line options for the Lambda command can be specified in this file."
+ ],
+ "profile": "",
+ "region": "",
+ "configuration": "Release",
+ "function-runtime": "dotnet8",
+ "function-memory-size": 512,
+ "function-timeout": 30,
+ "function-handler": "Json"
+}
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/kafka-json-event.json b/examples/Kafka/Json/src/kafka-json-event.json
new file mode 100644
index 000000000..66dc2ab5a
--- /dev/null
+++ b/examples/Kafka/Json/src/kafka-json-event.json
@@ -0,0 +1,23 @@
+{
+ "eventSource": "aws:kafka",
+ "eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/CustomerCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
+ "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
+ "records": {
+ "customer-topic-0": [
+ {
+ "topic": "customer-topic",
+ "partition": 0,
+ "offset": 15,
+ "timestamp": 1545084650987,
+ "timestampType": "CREATE_TIME",
+ "key": "dXNlcl85NzU0",
+ "value": "eyJwaG9uZV9udW1iZXJzIjpbeyJudW1iZXIiOiIyNDQtNDA3LTg4NzEiLCJ0eXBlIjoiV09SSyJ9XSwicHJlZmVyZW5jZXMiOnsidGltZXpvbmUiOiJlbmFibGVkIiwibGFuZ3VhZ2UiOiJkaXNhYmxlZCIsIm5vdGlmaWNhdGlvbnMiOiJkYXJrIn0sImZ1bGxfbmFtZSI6IlVzZXIgdXNlcl85NzU0IiwiYWRkcmVzcyI6eyJjb3VudHJ5IjoiVVNBIiwiY2l0eSI6IlNhbiBKb3NlIiwic3RyZWV0IjoiOTM0MCBNYWluIFN0Iiwic3RhdGUiOiJDQSIsInppcF9jb2RlIjoiMzk1OTYifSwidXNlcl9pZCI6InVzZXJfOTc1NCIsImFjY291bnRfc3RhdHVzIjoiU1VTUEVOREVEIiwiYWdlIjo1MywiZW1haWwiOnsiYWRkcmVzcyI6InVzZXJfOTc1NEBpY2xvdWQuY29tIiwidmVyaWZpZWQiOmZhbHNlLCJwcmltYXJ5Ijp0cnVlfX0=",
+ "headers": [
+ {
+ "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
+ }
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/examples/Kafka/Json/src/template.yaml b/examples/Kafka/Json/src/template.yaml
new file mode 100644
index 000000000..dd4bfb9ff
--- /dev/null
+++ b/examples/Kafka/Json/src/template.yaml
@@ -0,0 +1,27 @@
+AWSTemplateFormatVersion: '2010-09-09'
+Transform: AWS::Serverless-2016-10-31
+Description: >
+ kafka
+
+ Sample SAM Template for kafka
+
+# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
+Globals:
+ Function:
+ Timeout: 15
+ MemorySize: 512
+ Runtime: dotnet8
+
+Resources:
+ JsonDeserializationFunction:
+ Type: AWS::Serverless::Function
+ Properties:
+ Handler: Json
+ Architectures:
+ - x86_64
+ Tracing: Active
+ Environment: # Powertools env vars: https://awslabs.github.io/aws-lambda-powertools-python/#environment-variables
+ Variables:
+ POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
+ POWERTOOLS_LOG_LEVEL: Info
+ POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase (Default)
\ No newline at end of file
diff --git a/examples/Kafka/JsonClassLibrary/src/CustomerProfile.proto b/examples/Kafka/JsonClassLibrary/src/CustomerProfile.proto
new file mode 100644
index 000000000..9c69b1c41
--- /dev/null
+++ b/examples/Kafka/JsonClassLibrary/src/CustomerProfile.proto
@@ -0,0 +1,49 @@
+syntax = "proto3";
+
+package com.example;
+
+enum PhoneType {
+ HOME = 0;
+ WORK = 1;
+ MOBILE = 2;
+}
+
+enum AccountStatus {
+ ACTIVE = 0;
+ INACTIVE = 1;
+ SUSPENDED = 2;
+}
+
+// EmailAddress message
+message EmailAddress {
+ string address = 1;
+ bool verified = 2;
+ bool primary = 3;
+}
+
+// Address message
+message Address {
+ string street = 1;
+ string city = 2;
+ string state = 3;
+ string country = 4;
+ string zip_code = 5;
+}
+
+// PhoneNumber message
+message PhoneNumber {
+ string number = 1;
+ PhoneType type = 2;
+}
+
+// CustomerProfile message
+message CustomerProfile {
+ string user_id = 1;
+ string full_name = 2;
+ EmailAddress email = 3;
+ int32 age = 4;
+ Address address = 5;
+ repeated PhoneNumber phone_numbers = 6;
+ map preferences = 7;
+ AccountStatus account_status = 8;
+}
\ No newline at end of file
diff --git a/examples/Kafka/JsonClassLibrary/src/Function.cs b/examples/Kafka/JsonClassLibrary/src/Function.cs
new file mode 100644
index 000000000..98795029e
--- /dev/null
+++ b/examples/Kafka/JsonClassLibrary/src/Function.cs
@@ -0,0 +1,32 @@
+using Amazon.Lambda.Core;
+using AWS.Lambda.Powertools.Kafka;
+using AWS.Lambda.Powertools.Kafka.Protobuf;
+using AWS.Lambda.Powertools.Logging;
+using Com.Example;
+
+// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
+[assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))]
+
+namespace ProtoBufClassLibrary;
+
+public class Function
+{
+ public string FunctionHandler(ConsumerRecords records, ILambdaContext context)
+ {
+ foreach (var record in records)
+ {
+ Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
+ Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
+ Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
+
+ foreach (var header in record.Headers.DecodedValues())
+ {
+ Logger.LogInformation($"{header.Key}: {header.Value}");
+ }
+
+ Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
+ }
+
+ return "Processed " + records.Count() + " records";
+ }
+}
\ No newline at end of file
diff --git a/examples/Kafka/JsonClassLibrary/src/ProtoBufClassLibrary.csproj b/examples/Kafka/JsonClassLibrary/src/ProtoBufClassLibrary.csproj
new file mode 100644
index 000000000..a28e1a2f8
--- /dev/null
+++ b/examples/Kafka/JsonClassLibrary/src/ProtoBufClassLibrary.csproj
@@ -0,0 +1,42 @@
+
+
+ net8.0
+ enable
+ enable
+ true
+ Lambda
+
+ true
+
+ true
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+
+ PreserveNewest
+
+
+
+
+ Client
+ Public
+ True
+ True
+ obj/Debug/net8.0/
+ MSBuild:Compile
+ PreserveNewest
+
+
+
+
\ No newline at end of file
diff --git a/examples/Kafka/JsonClassLibrary/src/Readme.md b/examples/Kafka/JsonClassLibrary/src/Readme.md
new file mode 100644
index 000000000..ae7e610f4
--- /dev/null
+++ b/examples/Kafka/JsonClassLibrary/src/Readme.md
@@ -0,0 +1,130 @@
+# AWS Powertools for AWS Lambda .NET - Kafka Protobuf Example
+
+This project demonstrates how to use AWS Lambda Powertools for .NET with Amazon MSK (Managed Streaming for Kafka) to process events from Kafka topics.
+
+## Overview
+
+This example showcases a Lambda functions that consume messages from Kafka topics with Protocol Buffers serialization format.
+
+It uses the `AWS.Lambda.Powertools.Kafka.Protobuf` NuGet package to easily deserialize and process Kafka records.
+
+## Project Structure
+
+```bash
+examples/Kafka/Protobuf/src/
+├── Function.cs # Entry point for the Lambda function
+├── aws-lambda-tools-defaults.json # Default argument settings for AWS Lambda deployment
+├── template.yaml # AWS SAM template for deploying the function
+├── CustomerProfile.proto # Protocol Buffers definition file for the data structure used in the Kafka messages
+└── kafka-protobuf-event.json # Sample Protocol Buffers event to test the function
+```
+
+## Prerequisites
+
+- [Dotnet](https://dotnet.microsoft.com/en-us/download/dotnet) (dotnet8 or later)
+- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html)
+- [AWS CLI](https://aws.amazon.com/cli/)
+- An AWS account with appropriate permissions
+- [Amazon MSK](https://aws.amazon.com/msk/) cluster set up with a topic to consume messages from
+- [AWS.Lambda.Powertools.Kafka.Protobuf](https://www.nuget.org/packages/AWS.Lambda.Powertools.Kafka.Protobuf/) NuGet package installed in your project
+
+## Installation
+
+1. Clone the repository:
+
+ ```bash
+ git clone https://github.com/aws-powertools/powertools-lambda-dotnet.git
+ ```
+
+2. Navigate to the project directory:
+
+ ```bash
+ cd powertools-lambda-dotnet/examples/Kafka/Protobuf/src
+ ```
+
+3. Build the project:
+
+ ```bash
+ dotnet build
+ ```
+
+## Deployment
+
+Deploy the application using the AWS SAM CLI:
+
+```bash
+sam build
+sam deploy --guided
+```
+
+Follow the prompts to configure your deployment.
+
+## Protocol Buffers Format
+
+The Protobuf example handles messages serialized with Protocol Buffers. The schema is defined in a `.proto` file (which would need to be created), and the C# code is generated from that schema.
+
+This requires the `Grpc.Tools` package to deserialize the messages correctly.
+
+And update the `.csproj` file to include the `.proto` files.
+
+```xml
+
+ Client
+ Public
+ True
+ True
+ obj\Debug/net8.0/
+ MSBuild:Compile
+ PreserveNewest
+
+```
+
+## Usage Examples
+
+Once deployed, you can test the Lambda function by sending a sample Protocol Buffers event to the configured Kafka topic.
+You can use the `kafka-protobuf-event.json` file as a sample event to test the function.
+
+### Testing
+
+You can test the function locally using the AWS SAM CLI (Requires Docker to be installed):
+
+```bash
+sam local invoke ProtobufDeserializationFunction --event kafka-protobuf-event.json
+```
+
+This command simulates an invocation of the Lambda function with the provided event data.
+
+## How It Works
+
+1. **Event Source**: Configure your Lambda functions with an MSK or self-managed Kafka cluster as an event source.
+2. **Deserializing Records**: Powertools handles deserializing the records based on the specified format.
+3. **Processing**: Each record is processed within the handler function.
+
+## Event Deserialization
+
+Pass the `PowertoolsKafkaProtobufSerializer` to the `[assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))]`:
+
+```csharp
+[assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))]
+ ```
+
+## Configuration
+
+The SAM template (`template.yaml`) defines three Lambda function:
+
+- **ProtobufDeserializationFunction**: Handles Protobuf-formatted Kafka messages
+
+## Customization
+
+To customize the examples:
+
+1. Modify the schema definitions to match your data structures
+2. Update the handler logic to process the records according to your requirements
+3. Ensure you have the proper `.proto` files and that they are included in your project for Protocol Buffers serialization/deserialization.
+
+## Resources
+
+- [AWS Lambda Powertools for .NET Documentation](https://docs.powertools.aws.dev/lambda/dotnet/)
+- [Amazon MSK Documentation](https://docs.aws.amazon.com/msk/)
+- [AWS Lambda Developer Guide](https://docs.aws.amazon.com/lambda/)
+- [Protocol Buffers Documentation](https://developers.google.com/protocol-buffers)
\ No newline at end of file
diff --git a/examples/Kafka/JsonClassLibrary/src/aws-lambda-tools-defaults.json b/examples/Kafka/JsonClassLibrary/src/aws-lambda-tools-defaults.json
new file mode 100644
index 000000000..d4ec43f14
--- /dev/null
+++ b/examples/Kafka/JsonClassLibrary/src/aws-lambda-tools-defaults.json
@@ -0,0 +1,16 @@
+{
+ "Information": [
+ "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.",
+ "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.",
+ "dotnet lambda help",
+ "All the command line options for the Lambda command can be specified in this file."
+ ],
+ "profile": "",
+ "region": "",
+ "configuration": "Release",
+ "function-architecture": "x86_64",
+ "function-runtime": "dotnet8",
+ "function-memory-size": 512,
+ "function-timeout": 30,
+ "function-handler": "ProtoBufClassLibrary::ProtoBufClassLibrary.Function::FunctionHandler"
+}
\ No newline at end of file
diff --git a/examples/Kafka/JsonClassLibrary/src/kafka-protobuf-event.json b/examples/Kafka/JsonClassLibrary/src/kafka-protobuf-event.json
new file mode 100644
index 000000000..6731ceb40
--- /dev/null
+++ b/examples/Kafka/JsonClassLibrary/src/kafka-protobuf-event.json
@@ -0,0 +1,23 @@
+{
+ "eventSource": "aws:kafka",
+ "eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/CustomerCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
+ "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
+ "records": {
+ "customer-topic-0": [
+ {
+ "topic": "customer-topic",
+ "partition": 0,
+ "offset": 15,
+ "timestamp": 1545084650987,
+ "timestampType": "CREATE_TIME",
+ "key": "dXNlcl85NzU0",
+ "value": "Cgl1c2VyXzk3NTQSDlVzZXIgdXNlcl85NzU0GhgKFHVzZXJfOTc1NEBpY2xvdWQuY29tGAEgNSooCgw5MzQwIE1haW4gU3QSCFNhbiBKb3NlGgJDQSIDVVNBKgUzOTU5NjIQCgwyNDQtNDA3LTg4NzEQAToUCghsYW5ndWFnZRIIZGlzYWJsZWQ6FQoNbm90aWZpY2F0aW9ucxIEZGFyazoTCgh0aW1lem9uZRIHZW5hYmxlZEAC",
+ "headers": [
+ {
+ "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
+ }
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/examples/Kafka/JsonClassLibrary/src/template.yaml b/examples/Kafka/JsonClassLibrary/src/template.yaml
new file mode 100644
index 000000000..0df5feaa2
--- /dev/null
+++ b/examples/Kafka/JsonClassLibrary/src/template.yaml
@@ -0,0 +1,27 @@
+AWSTemplateFormatVersion: '2010-09-09'
+Transform: AWS::Serverless-2016-10-31
+Description: >
+ kafka
+
+ Sample SAM Template for kafka
+
+# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
+Globals:
+ Function:
+ Timeout: 15
+ MemorySize: 512
+ Runtime: dotnet8
+
+Resources:
+ ProtobufClassLibraryDeserializationFunction:
+ Type: AWS::Serverless::Function
+ Properties:
+ Handler: ProtoBufClassLibrary::ProtoBufClassLibrary.Function::FunctionHandler
+ Architectures:
+ - x86_64
+ Tracing: Active
+ Environment: # Powertools env vars: https://awslabs.github.io/aws-lambda-powertools-python/#environment-variables
+ Variables:
+ POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
+ POWERTOOLS_LOG_LEVEL: Info
+ POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase (Default)
\ No newline at end of file
diff --git a/examples/Kafka/Protobuf/src/CustomerProfile.proto b/examples/Kafka/Protobuf/src/CustomerProfile.proto
new file mode 100644
index 000000000..9c69b1c41
--- /dev/null
+++ b/examples/Kafka/Protobuf/src/CustomerProfile.proto
@@ -0,0 +1,49 @@
+syntax = "proto3";
+
+package com.example;
+
+enum PhoneType {
+ HOME = 0;
+ WORK = 1;
+ MOBILE = 2;
+}
+
+enum AccountStatus {
+ ACTIVE = 0;
+ INACTIVE = 1;
+ SUSPENDED = 2;
+}
+
+// EmailAddress message
+message EmailAddress {
+ string address = 1;
+ bool verified = 2;
+ bool primary = 3;
+}
+
+// Address message
+message Address {
+ string street = 1;
+ string city = 2;
+ string state = 3;
+ string country = 4;
+ string zip_code = 5;
+}
+
+// PhoneNumber message
+message PhoneNumber {
+ string number = 1;
+ PhoneType type = 2;
+}
+
+// CustomerProfile message
+message CustomerProfile {
+ string user_id = 1;
+ string full_name = 2;
+ EmailAddress email = 3;
+ int32 age = 4;
+ Address address = 5;
+ repeated PhoneNumber phone_numbers = 6;
+ map preferences = 7;
+ AccountStatus account_status = 8;
+}
\ No newline at end of file
diff --git a/examples/Kafka/Protobuf/src/Function.cs b/examples/Kafka/Protobuf/src/Function.cs
new file mode 100644
index 000000000..446328696
--- /dev/null
+++ b/examples/Kafka/Protobuf/src/Function.cs
@@ -0,0 +1,22 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.RuntimeSupport;
+using AWS.Lambda.Powertools.Kafka;
+using AWS.Lambda.Powertools.Kafka.Protobuf;
+using AWS.Lambda.Powertools.Logging;
+using Com.Example;
+
+string Handler(ConsumerRecords records, ILambdaContext context)
+{
+ foreach (var record in records)
+ {
+ Logger.LogInformation("Record Value: {@record}", record.Value);
+ }
+
+ return "Processed " + records.Count() + " records";
+}
+
+await LambdaBootstrapBuilder.Create((Func, ILambdaContext, string>?)Handler,
+ new PowertoolsKafkaProtobufSerializer()) // Use PowertoolsKafkaProtobufSerializer for Protobuf serialization
+ .Build()
+ .RunAsync();
+
diff --git a/examples/Kafka/Protobuf/src/Protobuf.csproj b/examples/Kafka/Protobuf/src/Protobuf.csproj
new file mode 100644
index 000000000..275fa84ec
--- /dev/null
+++ b/examples/Kafka/Protobuf/src/Protobuf.csproj
@@ -0,0 +1,43 @@
+
+
+ Exe
+ net8.0
+ enable
+ enable
+ true
+ Lambda
+
+ true
+
+ true
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+ PreserveNewest
+
+
+
+
+ Client
+ Public
+ True
+ True
+ obj\Debug/net8.0/
+ MSBuild:Compile
+ PreserveNewest
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/Kafka/Protobuf/src/Readme.md b/examples/Kafka/Protobuf/src/Readme.md
new file mode 100644
index 000000000..886bbffa1
--- /dev/null
+++ b/examples/Kafka/Protobuf/src/Readme.md
@@ -0,0 +1,133 @@
+# AWS Powertools for AWS Lambda .NET - Kafka Protobuf Example
+
+This project demonstrates how to use AWS Lambda Powertools for .NET with Amazon MSK (Managed Streaming for Kafka) to process events from Kafka topics.
+
+## Overview
+
+This example showcases a Lambda functions that consume messages from Kafka topics with Protocol Buffers serialization format.
+
+It uses the `AWS.Lambda.Powertools.Kafka.Protobuf` NuGet package to easily deserialize and process Kafka records.
+
+## Project Structure
+
+```bash
+examples/Kafka/Protobuf/src/
+├── Function.cs # Entry point for the Lambda function
+├── aws-lambda-tools-defaults.json # Default argument settings for AWS Lambda deployment
+├── template.yaml # AWS SAM template for deploying the function
+├── CustomerProfile.proto # Protocol Buffers definition file for the data structure used in the Kafka messages
+└── kafka-protobuf-event.json # Sample Protocol Buffers event to test the function
+```
+
+## Prerequisites
+
+- [Dotnet](https://dotnet.microsoft.com/en-us/download/dotnet) (dotnet8 or later)
+- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html)
+- [AWS CLI](https://aws.amazon.com/cli/)
+- An AWS account with appropriate permissions
+- [Amazon MSK](https://aws.amazon.com/msk/) cluster set up with a topic to consume messages from
+- [AWS.Lambda.Powertools.Kafka.Protobuf](https://www.nuget.org/packages/AWS.Lambda.Powertools.Kafka.Protobuf/) NuGet package installed in your project
+
+## Installation
+
+1. Clone the repository:
+
+ ```bash
+ git clone https://github.com/aws-powertools/powertools-lambda-dotnet.git
+ ```
+
+2. Navigate to the project directory:
+
+ ```bash
+ cd powertools-lambda-dotnet/examples/Kafka/Protobuf/src
+ ```
+
+3. Build the project:
+
+ ```bash
+ dotnet build
+ ```
+
+## Deployment
+
+Deploy the application using the AWS SAM CLI:
+
+```bash
+sam build
+sam deploy --guided
+```
+
+Follow the prompts to configure your deployment.
+
+## Protocol Buffers Format
+
+The Protobuf example handles messages serialized with Protocol Buffers. The schema is defined in a `.proto` file (which would need to be created), and the C# code is generated from that schema.
+
+This requires the `Grpc.Tools` package to deserialize the messages correctly.
+
+And update the `.csproj` file to include the `.proto` files.
+
+```xml
+
+ Client
+ Public
+ True
+ True
+ obj\Debug/net8.0/
+ MSBuild:Compile
+ PreserveNewest
+
+```
+
+## Usage Examples
+
+Once deployed, you can test the Lambda function by sending a sample Protocol Buffers event to the configured Kafka topic.
+You can use the `kafka-protobuf-event.json` file as a sample event to test the function.
+
+### Testing
+
+You can test the function locally using the AWS SAM CLI (Requires Docker to be installed):
+
+```bash
+sam local invoke ProtobufDeserializationFunction --event kafka-protobuf-event.json
+```
+
+This command simulates an invocation of the Lambda function with the provided event data.
+
+## How It Works
+
+1. **Event Source**: Configure your Lambda functions with an MSK or self-managed Kafka cluster as an event source.
+2. **Deserializing Records**: Powertools handles deserializing the records based on the specified format.
+3. **Processing**: Each record is processed within the handler function.
+
+## Event Deserialization
+
+Pass the `PowertoolsKafkaProtobufSerializer` to the `LambdaBootstrapBuilder.Create()` method to enable Protobuf deserialization of Kafka records:
+
+```csharp
+await LambdaBootstrapBuilder.Create((Func, ILambdaContext, string>?)Handler,
+ new PowertoolsKafkaProtobufSerializer()) // Use PowertoolsKafkaAvroSerializer for Avro serialization
+ .Build()
+ .RunAsync();
+ ```
+
+## Configuration
+
+The SAM template (`template.yaml`) defines three Lambda function:
+
+- **ProtobufDeserializationFunction**: Handles Protobuf-formatted Kafka messages
+
+## Customization
+
+To customize the examples:
+
+1. Modify the schema definitions to match your data structures
+2. Update the handler logic to process the records according to your requirements
+3. Ensure you have the proper `.proto` files and that they are included in your project for Protocol Buffers serialization/deserialization.
+
+## Resources
+
+- [AWS Lambda Powertools for .NET Documentation](https://docs.powertools.aws.dev/lambda/dotnet/)
+- [Amazon MSK Documentation](https://docs.aws.amazon.com/msk/)
+- [AWS Lambda Developer Guide](https://docs.aws.amazon.com/lambda/)
+- [Protocol Buffers Documentation](https://developers.google.com/protocol-buffers)
\ No newline at end of file
diff --git a/examples/Kafka/Protobuf/src/aws-lambda-tools-defaults.json b/examples/Kafka/Protobuf/src/aws-lambda-tools-defaults.json
new file mode 100644
index 000000000..1a1c5de1d
--- /dev/null
+++ b/examples/Kafka/Protobuf/src/aws-lambda-tools-defaults.json
@@ -0,0 +1,15 @@
+{
+ "Information": [
+ "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.",
+ "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.",
+ "dotnet lambda help",
+ "All the command line options for the Lambda command can be specified in this file."
+ ],
+ "profile": "",
+ "region": "",
+ "configuration": "Release",
+ "function-runtime": "dotnet8",
+ "function-memory-size": 512,
+ "function-timeout": 30,
+ "function-handler": "Protobuf"
+}
\ No newline at end of file
diff --git a/examples/Kafka/Protobuf/src/kafka-protobuf-event.json b/examples/Kafka/Protobuf/src/kafka-protobuf-event.json
new file mode 100644
index 000000000..6731ceb40
--- /dev/null
+++ b/examples/Kafka/Protobuf/src/kafka-protobuf-event.json
@@ -0,0 +1,23 @@
+{
+ "eventSource": "aws:kafka",
+ "eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/CustomerCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
+ "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
+ "records": {
+ "customer-topic-0": [
+ {
+ "topic": "customer-topic",
+ "partition": 0,
+ "offset": 15,
+ "timestamp": 1545084650987,
+ "timestampType": "CREATE_TIME",
+ "key": "dXNlcl85NzU0",
+ "value": "Cgl1c2VyXzk3NTQSDlVzZXIgdXNlcl85NzU0GhgKFHVzZXJfOTc1NEBpY2xvdWQuY29tGAEgNSooCgw5MzQwIE1haW4gU3QSCFNhbiBKb3NlGgJDQSIDVVNBKgUzOTU5NjIQCgwyNDQtNDA3LTg4NzEQAToUCghsYW5ndWFnZRIIZGlzYWJsZWQ6FQoNbm90aWZpY2F0aW9ucxIEZGFyazoTCgh0aW1lem9uZRIHZW5hYmxlZEAC",
+ "headers": [
+ {
+ "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
+ }
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/examples/Kafka/Protobuf/src/template.yaml b/examples/Kafka/Protobuf/src/template.yaml
new file mode 100644
index 000000000..b8f7df6a5
--- /dev/null
+++ b/examples/Kafka/Protobuf/src/template.yaml
@@ -0,0 +1,27 @@
+AWSTemplateFormatVersion: '2010-09-09'
+Transform: AWS::Serverless-2016-10-31
+Description: >
+ kafka
+
+ Sample SAM Template for kafka
+
+# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
+Globals:
+ Function:
+ Timeout: 15
+ MemorySize: 512
+ Runtime: dotnet8
+
+Resources:
+ ProtobufDeserializationFunction:
+ Type: AWS::Serverless::Function
+ Properties:
+ Handler: Protobuf
+ Architectures:
+ - x86_64
+ Tracing: Active
+ Environment: # Powertools env vars: https://awslabs.github.io/aws-lambda-powertools-python/#environment-variables
+ Variables:
+ POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
+ POWERTOOLS_LOG_LEVEL: Info
+ POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase (Default)
\ No newline at end of file
diff --git a/examples/examples.sln b/examples/examples.sln
index 10ec48509..6b9fa877a 100644
--- a/examples/examples.sln
+++ b/examples/examples.sln
@@ -109,6 +109,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AOT_Logging", "AOT\AOT_Logg
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AOT_Logging.Tests", "AOT\AOT_Logging\test\AOT_Logging.Tests\AOT_Logging.Tests.csproj", "{FC010A0E-64A9-4440-97FE-DEDA8CEE0BE5}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Kafka", "Kafka", "{71027B81-CA39-498C-9A50-ADDAFA2AC2F5}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Json", "Kafka\Json\src\Json.csproj", "{58EC305E-353A-4996-A541-3CF7FC0EDD80}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Protobuf", "Kafka\Protobuf\src\Protobuf.csproj", "{853F6FE9-1762-4BA3-BAF4-2FCD605B81CF}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Avro", "Kafka\Avro\src\Avro.csproj", "{B03F22B2-315C-429B-9CC0-C15BE94CBF77}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ProtoBufClassLibrary", "Kafka\JsonClassLibrary\src\ProtoBufClassLibrary.csproj", "{B6B3136D-B739-4917-AD3D-30F19FE12D3F}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -202,6 +212,22 @@ Global
{FC010A0E-64A9-4440-97FE-DEDA8CEE0BE5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FC010A0E-64A9-4440-97FE-DEDA8CEE0BE5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FC010A0E-64A9-4440-97FE-DEDA8CEE0BE5}.Release|Any CPU.Build.0 = Release|Any CPU
+ {58EC305E-353A-4996-A541-3CF7FC0EDD80}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {58EC305E-353A-4996-A541-3CF7FC0EDD80}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {58EC305E-353A-4996-A541-3CF7FC0EDD80}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {58EC305E-353A-4996-A541-3CF7FC0EDD80}.Release|Any CPU.Build.0 = Release|Any CPU
+ {853F6FE9-1762-4BA3-BAF4-2FCD605B81CF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {853F6FE9-1762-4BA3-BAF4-2FCD605B81CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {853F6FE9-1762-4BA3-BAF4-2FCD605B81CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {853F6FE9-1762-4BA3-BAF4-2FCD605B81CF}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B03F22B2-315C-429B-9CC0-C15BE94CBF77}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B03F22B2-315C-429B-9CC0-C15BE94CBF77}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B03F22B2-315C-429B-9CC0-C15BE94CBF77}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B03F22B2-315C-429B-9CC0-C15BE94CBF77}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B6B3136D-B739-4917-AD3D-30F19FE12D3F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B6B3136D-B739-4917-AD3D-30F19FE12D3F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B6B3136D-B739-4917-AD3D-30F19FE12D3F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B6B3136D-B739-4917-AD3D-30F19FE12D3F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{0CC66DBC-C1DF-4AF6-8EEB-FFED6C578BF4} = {526F1EF7-5A9C-4BFF-ABAE-75992ACD8F78}
@@ -249,5 +275,9 @@ Global
{343CF6B9-C006-43F8-924C-BF5BF5B6D051} = {FE1CAA26-87E9-4B71-800E-81D2997A7B53}
{FC02CF45-DE15-4413-958A-D86808B99146} = {FEE72EAB-494F-403B-A75A-825E713C3D43}
{FC010A0E-64A9-4440-97FE-DEDA8CEE0BE5} = {F3480212-EE7F-46FE-9ED5-24ACAB5B681D}
+ {58EC305E-353A-4996-A541-3CF7FC0EDD80} = {71027B81-CA39-498C-9A50-ADDAFA2AC2F5}
+ {853F6FE9-1762-4BA3-BAF4-2FCD605B81CF} = {71027B81-CA39-498C-9A50-ADDAFA2AC2F5}
+ {B03F22B2-315C-429B-9CC0-C15BE94CBF77} = {71027B81-CA39-498C-9A50-ADDAFA2AC2F5}
+ {B6B3136D-B739-4917-AD3D-30F19FE12D3F} = {71027B81-CA39-498C-9A50-ADDAFA2AC2F5}
EndGlobalSection
EndGlobal
diff --git a/libraries/AWS.Lambda.Powertools.sln b/libraries/AWS.Lambda.Powertools.sln
index c3056d147..325c683e0 100644
--- a/libraries/AWS.Lambda.Powertools.sln
+++ b/libraries/AWS.Lambda.Powertools.sln
@@ -113,6 +113,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.Event
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.EventHandler.Resolvers.BedrockAgentFunction.AspNetCore", "src\AWS.Lambda.Powertools.EventHandler.Resolvers.BedrockAgentFunction.AspNetCore\AWS.Lambda.Powertools.EventHandler.Resolvers.BedrockAgentFunction.AspNetCore.csproj", "{8A22F22E-D10A-4897-A89A-DC76C267F6BB}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.Kafka", "src\AWS.Lambda.Powertools.Kafka\AWS.Lambda.Powertools.Kafka.csproj", "{5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.Kafka.Tests", "tests\AWS.Lambda.Powertools.Kafka.Tests\AWS.Lambda.Powertools.Kafka.Tests.csproj", "{FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.Kafka.Avro", "src\AWS.Lambda.Powertools.Kafka.Avro\AWS.Lambda.Powertools.Kafka.Avro.csproj", "{25F0929B-2E04-4ED6-A0ED-5379A0A755B0}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.Kafka.Json", "src\AWS.Lambda.Powertools.Kafka.Json\AWS.Lambda.Powertools.Kafka.Json.csproj", "{9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Lambda.Powertools.Kafka.Protobuf", "src\AWS.Lambda.Powertools.Kafka.Protobuf\AWS.Lambda.Powertools.Kafka.Protobuf.csproj", "{B640DB80-C982-407B-A2EC-CD29AC77DDB8}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -618,6 +628,66 @@ Global
{8A22F22E-D10A-4897-A89A-DC76C267F6BB}.Release|x64.Build.0 = Release|Any CPU
{8A22F22E-D10A-4897-A89A-DC76C267F6BB}.Release|x86.ActiveCfg = Release|Any CPU
{8A22F22E-D10A-4897-A89A-DC76C267F6BB}.Release|x86.Build.0 = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|x64.Build.0 = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Debug|x86.Build.0 = Debug|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|Any CPU.Build.0 = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|x64.ActiveCfg = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|x64.Build.0 = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|x86.ActiveCfg = Release|Any CPU
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3}.Release|x86.Build.0 = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|x64.Build.0 = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Debug|x86.Build.0 = Debug|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|Any CPU.Build.0 = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|x64.ActiveCfg = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|x64.Build.0 = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|x86.ActiveCfg = Release|Any CPU
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645}.Release|x86.Build.0 = Release|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Debug|x64.Build.0 = Debug|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Debug|x86.Build.0 = Debug|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Release|Any CPU.Build.0 = Release|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Release|x64.ActiveCfg = Release|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Release|x64.Build.0 = Release|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Release|x86.ActiveCfg = Release|Any CPU
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0}.Release|x86.Build.0 = Release|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Debug|x64.Build.0 = Debug|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Debug|x86.Build.0 = Debug|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Release|x64.ActiveCfg = Release|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Release|x64.Build.0 = Release|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Release|x86.ActiveCfg = Release|Any CPU
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E}.Release|x86.Build.0 = Release|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Debug|x64.Build.0 = Debug|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Debug|x86.Build.0 = Debug|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Release|x64.ActiveCfg = Release|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Release|x64.Build.0 = Release|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Release|x86.ActiveCfg = Release|Any CPU
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
@@ -671,5 +741,10 @@ Global
{F4B8D5AF-D3CA-4910-A14D-E5BAEF0FD1DE} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
{281F7EB5-ACE5-458F-BC88-46A8899DF3BA} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
{8A22F22E-D10A-4897-A89A-DC76C267F6BB} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
+ {5B0DDE6F-ED16-452F-90D3-F0B6086D51B3} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
+ {FDBDB9F8-B3E2-4ACA-9FC6-E12FF3D95645} = {1CFF5568-8486-475F-81F6-06105C437528}
+ {25F0929B-2E04-4ED6-A0ED-5379A0A755B0} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
+ {9E2B8160-3E76-4B33-86AB-DE35A5FCDB1E} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
+ {B640DB80-C982-407B-A2EC-CD29AC77DDB8} = {73C9B1E5-3893-47E8-B373-17E5F5D7E6F5}
EndGlobalSection
EndGlobal
diff --git a/libraries/src/AWS.Lambda.Powertools.Kafka.Avro/AWS.Lambda.Powertools.Kafka.Avro.csproj b/libraries/src/AWS.Lambda.Powertools.Kafka.Avro/AWS.Lambda.Powertools.Kafka.Avro.csproj
new file mode 100644
index 000000000..255e852a6
--- /dev/null
+++ b/libraries/src/AWS.Lambda.Powertools.Kafka.Avro/AWS.Lambda.Powertools.Kafka.Avro.csproj
@@ -0,0 +1,21 @@
+
+
+
+
+
+ AWS.Lambda.Powertools.Kafka.Avro
+ Powertools for AWS Lambda (.NET) - Kafka Avro consumer package.
+ AWS.Lambda.Powertools.Kafka.Avro
+ AWS.Lambda.Powertools.Kafka.Avro
+ net8.0
+ false
+ enable
+ enable
+
+
+
+
+
+
+
+
diff --git a/libraries/src/AWS.Lambda.Powertools.Kafka.Avro/PowertoolsKafkaAvroSerializer.cs b/libraries/src/AWS.Lambda.Powertools.Kafka.Avro/PowertoolsKafkaAvroSerializer.cs
new file mode 100644
index 000000000..4bf3ea7cb
--- /dev/null
+++ b/libraries/src/AWS.Lambda.Powertools.Kafka.Avro/PowertoolsKafkaAvroSerializer.cs
@@ -0,0 +1,134 @@
+/*
+ * Copyright JsonCons.Net authors. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+using System.Diagnostics.CodeAnalysis;
+using System.Reflection;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using Avro;
+using Avro.IO;
+using Avro.Specific;
+
+namespace AWS.Lambda.Powertools.Kafka.Avro;
+
+///
+/// A Lambda serializer for Kafka events that handles Avro-formatted data.
+/// This serializer automatically deserializes the Avro binary format from base64-encoded strings
+/// in Kafka records and converts them to strongly-typed objects.
+///
+///
+///
+/// [assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))]
+///
+/// // Your Lambda handler will receive properly deserialized objects
+/// public class Function
+/// {
+/// public void Handler(ConsumerRecords<string, Customer> records, ILambdaContext context)
+/// {
+/// foreach (var record in records)
+/// {
+/// Customer customer = record.Value;
+/// context.Logger.LogInformation($"Processed customer {customer.Name}, age {customer.Age}");
+/// }
+/// }
+/// }
+///
+///
+public class PowertoolsKafkaAvroSerializer : PowertoolsKafkaSerializerBase
+{
+ ///
+ /// Initializes a new instance of the class
+ /// with default JSON serialization options.
+ ///
+ public PowertoolsKafkaAvroSerializer() : base()
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class
+ /// with custom JSON serialization options.
+ ///
+ /// Custom JSON serializer options to use during deserialization.
+ public PowertoolsKafkaAvroSerializer(JsonSerializerOptions jsonOptions) : base(jsonOptions)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class
+ /// with a JSON serializer context for AOT-compatible serialization.
+ ///
+ /// JSON serializer context for AOT compatibility.
+ public PowertoolsKafkaAvroSerializer(JsonSerializerContext serializerContext) : base(serializerContext)
+ {
+ }
+
+ ///
+ /// Gets the Avro schema for the specified type.
+ /// The type must have a public static _SCHEMA field defined.
+ ///
+ /// The type to get the Avro schema for.
+ /// The Avro Schema object.
+ /// Thrown if no schema is found for the type.
+ [RequiresDynamicCode("Avro schema access requires reflection which may be incompatible with AOT.")]
+ [RequiresUnreferencedCode("Avro schema access requires reflection which may be incompatible with trimming.")]
+ private Schema? GetAvroSchema([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicFields)] Type payloadType)
+ {
+ var schemaField = payloadType.GetField("_SCHEMA",
+ BindingFlags.Public | BindingFlags.Static);
+
+ if (schemaField == null)
+ return null;
+
+ return schemaField.GetValue(null) as Schema;
+ }
+
+ ///
+ /// Deserializes complex (non-primitive) types using Avro format.
+ ///
+ /// The binary data to deserialize.
+ /// The type to deserialize to.
+ /// Whether this data represents a key (true) or a value (false).
+ /// The deserialized object.
+ [RequiresDynamicCode("Avro deserialization might require runtime code generation.")]
+ [RequiresUnreferencedCode("Avro deserialization might require types that cannot be statically analyzed.")]
+ protected override object? DeserializeComplexTypeFormat(byte[] data,
+ [DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicFields)]
+ Type targetType, bool isKey)
+ {
+ try
+ {
+ // Try to get Avro schema for the type
+ var schema = GetAvroSchema(targetType);
+
+ if (schema != null)
+ {
+ using var stream = new MemoryStream(data);
+ var decoder = new BinaryDecoder(stream);
+ var reader = new SpecificDatumReader