From c339f0526ae49d02614649b72588ea6b599ac99c Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Fri, 20 Jun 2025 18:06:04 +0200 Subject: [PATCH 1/3] docs(kafka): finalize Kafka docs --- docs/features/kafka.md | 158 ++++++--- .../kafka/advancedBasicErrorHandling.ts | 52 +++ .../kafka/advancedHandlingLargeMessages.ts | 42 +++ .../kafka/advancedParserErrorHandling.ts | 60 ++++ .../snippets/kafka/advancedTestingYourCode.ts | 45 +++ .../kafka/advancedWorkingWithArkType.ts | 7 +- .../advancedWorkingWithRecordMetadata.ts | 10 +- .../kafka/advancedWorkingWithValibot.ts | 7 +- .../snippets/kafka/advancedWorkingWithZod.ts | 7 +- .../kafka/gettingStartedPrimitiveValues.ts | 2 +- .../advancedBatchSizeConfiguration.yaml | 21 ++ packages/kafka/README.md | 305 ++++++++++++++++++ packages/kafka/src/consumer.ts | 9 +- packages/kafka/src/errors.ts | 17 +- packages/kafka/tests/unit/consumer.test.ts | 11 +- 15 files changed, 669 insertions(+), 84 deletions(-) create mode 100644 examples/snippets/kafka/advancedBasicErrorHandling.ts create mode 100644 examples/snippets/kafka/advancedHandlingLargeMessages.ts create mode 100644 examples/snippets/kafka/advancedParserErrorHandling.ts create mode 100644 examples/snippets/kafka/advancedTestingYourCode.ts create mode 100644 examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml diff --git a/docs/features/kafka.md b/docs/features/kafka.md index 0ccd15078b..c8cfdc8802 100644 --- a/docs/features/kafka.md +++ b/docs/features/kafka.md @@ -72,7 +72,7 @@ Depending on the schema types you want to use, install the library and the corre Additionally, if you want to use output parsing with [Standard Schema](https://github.com/standard-schema/standard-schema), you can install [any of the supported libraries](https://standardschema.dev/#what-schema-libraries-implement-the-spec), for example: Zod, Valibot, or ArkType. -### Required resources + ### Using ESM with Schema Registry @@ -97,19 +97,19 @@ The Kafka consumer utility transforms raw Kafka events into an intuitive format === "Avro Messages" - ```typescript + ```typescript hl_lines="2-3 8-13 15 19" --8<-- "examples/snippets/kafka/gettingStartedAvro.ts" ``` === "Protocol Buffers" - ```typescript + ```typescript hl_lines="1-2 8-13 15 19" --8<-- "examples/snippets/kafka/gettingStartedProtobuf.ts" ``` === "JSON Messages" - ```typescript + ```typescript hl_lines="1-2 7-11 13 17" --8<-- "examples/snippets/kafka/gettingStartedJson.ts" ``` @@ -119,7 +119,7 @@ The `kafkaConsumer` function can deserialize both keys and values independently === "index.ts" - ```typescript + ```typescript hl_lines="9 13 22 25-26" --8<-- "examples/snippets/kafka/gettingStartedKeyValue.ts:func" ``` @@ -197,7 +197,7 @@ Each Kafka record contains important metadata that you can access alongside the === "Working with Record Metadata" - ```typescript + ```typescript hl_lines="10" --8<-- "examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts" ``` @@ -205,19 +205,21 @@ For debugging purposes, you can also access the original key, value, and headers #### Available metadata properties -| Property | Description | Example Use Case | -|-------------------|-----------------------------------------------------|---------------------------------------------| -| `topic` | Topic name the record was published to | Routing logic in multi-topic consumers | -| `partition` | Kafka partition number | Tracking message distribution | -| `offset` | Position in the partition | De-duplication, exactly-once processing | -| `timestamp` | Unix timestamp when record was created | Event timing analysis | -| `timestamp_type` | Timestamp type (`CREATE_TIME` or `LOG_APPEND_TIME`) | Data lineage verification | -| `headers` | Key-value pairs attached to the message | Cross-cutting concerns like correlation IDs | -| `key` | Deserialized message key | Customer ID or entity identifier | -| `value` | Deserialized message content | The actual business data | -| `originalValue` | Base64-encoded original message value | Debugging or custom deserialization | -| `originalKey` | Base64-encoded original message key | Debugging or custom deserialization | -| `originalHeaders` | Base64-encoded original message headers | Debugging or custom deserialization | +| Property | Description | Example Use Case | +|-----------------------|------------------------------------------------------------------|---------------------------------------------------------------------| +| `topic` | Topic name the record was published to | Routing logic in multi-topic consumers | +| `partition` | Kafka partition number | Tracking message distribution | +| `offset` | Position in the partition | De-duplication, exactly-once processing | +| `timestamp` | Unix timestamp when record was created | Event timing analysis | +| `timestamp_type` | Timestamp type (`CREATE_TIME` or `LOG_APPEND_TIME`) | Data lineage verification | +| `headers` | Key-value pairs attached to the message | Cross-cutting concerns like correlation IDs | +| `key` | Deserialized message key | Customer ID or entity identifier | +| `value` | Deserialized message content | The actual business data | +| `originalValue` | Base64-encoded original message value | Debugging or custom deserialization | +| `originalKey` | Base64-encoded original message key | Debugging or custom deserialization | +| `originalHeaders` | Base64-encoded original message headers | Debugging or custom deserialization | +| `valueSchemaMetadata` | Metadata about the value schema like `schemaId` and `dataFormat` | Used by `kafkaConsumer` to process Protobuf, data format validation | +| `keySchemaMetadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Used by `kafkaConsumer` to process Protobuf, data format validation | ### Custom output serializers @@ -225,44 +227,112 @@ You can parse deserialized data using your preferred parsing library. This can h === "Zod" - ```typescript + ```typescript hl_lines="25 29" --8<-- "examples/snippets/kafka/advancedWorkingWithZod.ts" ``` === "Valibot" - ```typescript + ```typescript hl_lines="28 32" --8<-- "examples/snippets/kafka/advancedWorkingWithValibot.ts" ``` === "ArkType" - ```typescript + ```typescript hl_lines="25 29" --8<-- "examples/snippets/kafka/advancedWorkingWithArkType.ts" ``` -#### Exception types +### Error handling + +Handle errors gracefully when processing Kafka messages to ensure your application maintains resilience and provides clear diagnostic information. The Kafka consumer utility provides specific exception types to help you identify and handle deserialization issues effectively. + +!!! tip + Fields like `value`, `key`, and `headers` are decoded lazily, meaning they are only deserialized when accessed. This allows you to handle deserialization errors at the point of access rather than when the record is first processed. + +=== "Basic Error Handling" + + ```typescript hl_lines="29 36 45" + --8<-- "examples/snippets/kafka/advancedBasicErrorHandling.ts:3" + ``` + + 1. If you want to handle deserialization and parsing errors, you should destructure or access the `value`, `key`, or `headers` properties of the record within the `for...of` loop. + +=== "Parser Error Handling" -| Exception | Description | Common Causes | -|-----------|-------------|---------------| -| `KafkaConsumerDeserializationError` | Raised when message deserialization fails | Corrupted message data, schema mismatch, or wrong schema type configuration | -| `KafkaConsumerAvroSchemaParserError` | Raised when parsing Avro schema definition fails | Syntax errors in schema JSON, invalid field types, or malformed schema | -| `KafkaConsumerMissingSchemaError` | Raised when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) | -| `KafkaConsumerOutputSerializerError` | Raised when output serializer fails | Error in custom serializer function, incompatible data, or validation failures in Pydantic models | + ```typescript hl_lines="41 44" + --8<-- "examples/snippets/kafka/advancedParserErrorHandling.ts:3" + ``` + + 1. The `cause` property of the error is populated with the original Standard Schema parsing error, allowing you to access detailed information about the parsing failure. + +#### Error types + +| Exception | Description | Common Causes | +|--------------------------------------|-----------------------------------------------|-----------------------------------------------------------------------------| +| `KafkaConsumerError`. | Base class for all Kafka consumer errors | General unhandled errors | +| `KafkaConsumerDeserializationError` | Thrown when message deserialization fails | Corrupted message data, schema mismatch, or wrong schema type configuration | +| `KafkaConsumerMissingSchemaError` | Thrown when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) | +| `KafkaConsumerOutputSerializerError` | Thrown when additional schema parsing fails | Parsing failures in Standard Schema models | ### Integrating with Idempotency -When processing Kafka messages in Lambda, failed batches can result in message reprocessing. The idempotency utility prevents duplicate processing by tracking which messages have already been handled, ensuring each message is processed exactly once. +When processing Kafka messages in Lambda, failed batches can result in message reprocessing. The [Idempotency utility](./idempotency.md) prevents duplicate processing by tracking which messages have already been handled, ensuring each message is processed exactly once. The Idempotency utility automatically stores the result of each successful operation, returning the cached result if the same message is processed again, which prevents potentially harmful duplicate operations like double-charging customers or double-counting metrics. +!!! tip + By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once. + === "Idempotent Kafka Processing" - ```typescript + ```typescript hl_lines="44 51" --8<-- "examples/snippets/kafka/advancedWorkingWithIdempotency.ts" ``` -TIP: By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once. +### Best practices + +#### Handling large messages + +When processing large Kafka messages in Lambda, be mindful of memory limitations. Although the Kafka consumer utility optimizes memory usage, large deserialized messages can still exhaust the function resources. + +=== "Handling Large Messages" + + ```typescript hl_lines="18-20" + --8<-- "examples/snippets/kafka/advancedHandlingLargeMessages.ts:6" + ``` + +For large messages, consider these proven approaches: + +* **Store the data:** use Amazon S3 and include only the S3 reference in your Kafka message +* **Split large payloads:** use multiple smaller messages with sequence identifiers +* **Increase memory:** Increase your Lambda function's memory allocation, which also increases CPU capacity + +#### Batch size configuration + +The number of Kafka records processed per Lambda invocation is controlled by your Event Source Mapping configuration. Properly sized batches optimize cost and performance. + +=== "Handling Large Messages" + + ```yaml hl_lines="16" + --8<-- "examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml" + ``` + +Different workloads benefit from different batch configurations: + +* **High-volume, simple processing:** Use larger batches (100-500 records) with short timeout +* **Complex processing with database operations:** Use smaller batches (10-50 records) +* **Mixed message sizes:** Set appropriate batching window (1-5 seconds) to handle variability + +#### Cross-language compatibility + +When using binary serialization formats across multiple programming languages, ensure consistent schema handling to prevent deserialization failures. + +Common cross-language challenges to address: + +* **Field naming conventions:** camelCase in Java vs snake_case in Python +* **Date/time:** representation differences +* **Numeric precision handling:** especially decimals, doubles, and floats ### Troubleshooting @@ -274,14 +344,14 @@ First, check that your schema definition exactly matches the message format. Eve For binary messages that fail to deserialize, examine the raw encoded data: -```python -# DO NOT include this code in production handlers -# For troubleshooting purposes only +```javascript +// DO NOT include this code in production handlers +// For troubleshooting purposes only import base64 -raw_bytes = base64.b64decode(record.raw_value) -print(f"Message size: {len(raw_bytes)} bytes") -print(f"First 50 bytes (hex): {raw_bytes[:50].hex()}") +const rawBytes = Buffer.from(record.originalValue, 'base64'); +console.log(`Message size: ${rawBytes.length} bytes`); +console.log(`First 50 bytes (hex): ${rawBytes.slice(0, 50).toString('hex')}`); ``` #### Schema compatibility issues @@ -311,7 +381,7 @@ For timeout issues: * Implement chunked or asynchronous processing patterns for time-consuming operations * Monitor and optimize database operations, external API calls, or other I/O operations in your handler -???+ tip "Monitoring memory usage" +!!! tip "Monitoring memory usage" Use CloudWatch metrics to track your function's memory utilization. If it consistently exceeds 80% of allocated memory, consider increasing the memory allocation or optimizing your code. ## Kafka consumer workflow @@ -342,4 +412,10 @@ For timeout issues: ## Testing your code -TBD +Testing Kafka consumer code requires simulating Lambda events with Kafka messages. You can create simple test cases using local JSON files without needing a live Kafka cluster. Below an example of how to simulate a JSON message. + +=== "Testing your code" + + ```typescript + --8<-- "examples/snippets/kafka/advancedTestingYourCode.ts" + ``` diff --git a/examples/snippets/kafka/advancedBasicErrorHandling.ts b/examples/snippets/kafka/advancedBasicErrorHandling.ts new file mode 100644 index 0000000000..e0eb9aef26 --- /dev/null +++ b/examples/snippets/kafka/advancedBasicErrorHandling.ts @@ -0,0 +1,52 @@ +declare function processRecord(record: unknown): Promise; + +import { readFileSync } from 'node:fs'; +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import { KafkaConsumerDeserializationError } from '@aws-lambda-powertools/kafka/errors'; +import type { ConsumerRecord } from '@aws-lambda-powertools/kafka/types'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const schemaConfig = { + value: { + type: SchemaType.AVRO, + schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'), + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer(async (event, _context) => { + const results: { + successful: number; + failed: Array>; + } = { + successful: 0, + failed: [], + }; + for (const record of event.records) { + try { + const { value, partition, offset, topic } = record; // (1)! + logger.setCorrelationId(`${topic}-${partition}-${offset}`); + + await processRecord(value); + + results.successful += 1; + } catch (error) { + if (error instanceof KafkaConsumerDeserializationError) { + results.failed.push(record); + logger.error('Error deserializing message', { error }); + } else { + logger.error('Error processing message', { error }); + } + } + + if (results.failed.length > 0) { + // Handle failed records, e.g., send to a dead-letter queue + } + + logger.info('Successfully processed records', { + successful: results.successful, + }); + } +}, schemaConfig); diff --git a/examples/snippets/kafka/advancedHandlingLargeMessages.ts b/examples/snippets/kafka/advancedHandlingLargeMessages.ts new file mode 100644 index 0000000000..a9ac47909e --- /dev/null +++ b/examples/snippets/kafka/advancedHandlingLargeMessages.ts @@ -0,0 +1,42 @@ +declare function processRecordFromS3({ + key, + bucket, +}: { key: string; bucket: string }): Promise; + +import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import { Logger } from '@aws-lambda-powertools/logger'; +import { object, safeParse, string } from 'valibot'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const LargeMessage = object({ + key: string(), + bucket: string(), +}); + +export const handler = kafkaConsumer(async (event, _context) => { + for (const record of event.records) { + const { topic, value, originalValue } = record; + const valueSize = Buffer.byteLength(originalValue, 'utf8'); + const parsedValue = safeParse(LargeMessage, value); + if ( + topic === 'product-catalog' && + valueSize > 3_000_000 && + parsedValue.success + ) { + logger.info('Large message detected, processing from S3', { + size: valueSize, + }); + + const { key, bucket } = parsedValue.output; + await processRecordFromS3({ key, bucket }); + + logger.info('Processed large message from S3', { + key, + bucket, + }); + } + + // regular processing of the record + } +}); diff --git a/examples/snippets/kafka/advancedParserErrorHandling.ts b/examples/snippets/kafka/advancedParserErrorHandling.ts new file mode 100644 index 0000000000..dfece49460 --- /dev/null +++ b/examples/snippets/kafka/advancedParserErrorHandling.ts @@ -0,0 +1,60 @@ +declare function processRecord(record: unknown): Promise; + +import { readFileSync } from 'node:fs'; +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import { KafkaConsumerParserError } from '@aws-lambda-powertools/kafka/errors'; +import type { ConsumerRecord } from '@aws-lambda-powertools/kafka/types'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; +import { z } from 'zod/v4'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const schemaConfig = { + value: { + type: SchemaType.AVRO, + schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'), + parserSchema: z.object({ + id: z.number(), + name: z.string(), + email: z.email(), + }), + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer(async (event, _context) => { + const results: { + successful: number; + failed: Array>; + } = { + successful: 0, + failed: [], + }; + for (const record of event.records) { + try { + const { value, partition, offset, topic } = record; + logger.setCorrelationId(`${topic}-${partition}-${offset}`); + + await processRecord(value); + results.successful += 1; + } catch (error) { + if (error instanceof KafkaConsumerParserError) { + results.failed.push(record); + logger.error( + `Error deserializing message - ${z.prettifyError({ issues: error.cause } as z.ZodError)}`, + { error } // (1)! + ); + } else { + logger.error('Error processing message', { error }); + } + } + + if (results.failed.length > 0) { + // Handle failed records, e.g., send to a dead-letter queue + } + + logger.info('Successfully processed records', { + successful: results.successful, + }); + } +}, schemaConfig); diff --git a/examples/snippets/kafka/advancedTestingYourCode.ts b/examples/snippets/kafka/advancedTestingYourCode.ts new file mode 100644 index 0000000000..8de8694b19 --- /dev/null +++ b/examples/snippets/kafka/advancedTestingYourCode.ts @@ -0,0 +1,45 @@ +import type { MSKEvent } from '@aws-lambda-powertools/kafka/types'; +import type { Context } from 'aws-lambda'; +import { expect, it } from 'vitest'; +import { handler } from './gettingStartedPrimitiveValues.js'; + +it('handles complex protobuf messages from Glue Schema Registry', async () => { + // Prepare + const event = { + eventSource: 'aws:kafka', + eventSourceArn: + 'arn:aws:kafka:us-east-1:123456789012:cluster/MyCluster/12345678-1234-1234-1234-123456789012-1', + bootstrapServers: + 'b-1.mskcluster.abcde12345.us-east-1.kafka.amazonaws.com:9092', + records: { + 'orders-topic': [ + { + topic: 'orders-topic', + partition: 0, + offset: 15, + timestamp: 1545084650987, + timestampType: 'CREATE_TIME', + headers: [], + key: undefined, + keySchemaMetadata: { + dataFormat: 'JSON', + }, + valueSchemaMetadata: { + dataFormat: 'JSON', + schemaId: undefined, + }, + value: Buffer.from( + JSON.stringify({ order_id: '12345', amount: 99.95 }) + ).toString('base64'), + }, + ], + }, + } as MSKEvent; + + // Act + const result = await handler(event, {} as Context); + + // Assess + expect(result).toBeDefined(); + // You can add more specific assertions based on your handler's logic +}); diff --git a/examples/snippets/kafka/advancedWorkingWithArkType.ts b/examples/snippets/kafka/advancedWorkingWithArkType.ts index 3e7cc5b37e..f0926efad7 100644 --- a/examples/snippets/kafka/advancedWorkingWithArkType.ts +++ b/examples/snippets/kafka/advancedWorkingWithArkType.ts @@ -28,9 +28,10 @@ const schemaConfig = { export const handler = kafkaConsumer( async (event, _context) => { - for (const { - value: { id, items }, - } of event.records) { + for (const record of event.records) { + const { + value: { id, items }, + } = record; logger.setCorrelationId(id); logger.debug(`order includes ${items.length} items`); } diff --git a/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts b/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts index 23fdc79ad7..7bda7f5f2d 100644 --- a/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts +++ b/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts @@ -6,14 +6,8 @@ const logger = new Logger({ serviceName: 'kafka-consumer' }); export const handler = kafkaConsumer( async (event, _context) => { - for (const { - value, - topic, - partition, - offset, - timestamp, - headers, - } of event.records) { + for (const record of event.records) { + const { value, topic, partition, offset, timestamp, headers } = record; logger.info(`processing message from topic ${topic}`, { partition, offset, diff --git a/examples/snippets/kafka/advancedWorkingWithValibot.ts b/examples/snippets/kafka/advancedWorkingWithValibot.ts index ff33c818b4..29c691aa4f 100644 --- a/examples/snippets/kafka/advancedWorkingWithValibot.ts +++ b/examples/snippets/kafka/advancedWorkingWithValibot.ts @@ -31,9 +31,10 @@ const schemaConfig = { export const handler = kafkaConsumer>( async (event, _context) => { - for (const { - value: { id, items }, - } of event.records) { + for (const record of event.records) { + const { + value: { id, items }, + } = record; logger.setCorrelationId(id); logger.debug(`order includes ${items.length} items`); } diff --git a/examples/snippets/kafka/advancedWorkingWithZod.ts b/examples/snippets/kafka/advancedWorkingWithZod.ts index 3ebe835484..6bb9240f1d 100644 --- a/examples/snippets/kafka/advancedWorkingWithZod.ts +++ b/examples/snippets/kafka/advancedWorkingWithZod.ts @@ -28,9 +28,10 @@ const schemaConfig = { export const handler = kafkaConsumer>( async (event, _context) => { - for (const { - value: { id, items }, - } of event.records) { + for (const record of event.records) { + const { + value: { id, items }, + } = record; logger.setCorrelationId(id); logger.debug(`order includes ${items.length} items`); } diff --git a/examples/snippets/kafka/gettingStartedPrimitiveValues.ts b/examples/snippets/kafka/gettingStartedPrimitiveValues.ts index dcf30e939d..96b03a08bf 100644 --- a/examples/snippets/kafka/gettingStartedPrimitiveValues.ts +++ b/examples/snippets/kafka/gettingStartedPrimitiveValues.ts @@ -1,4 +1,4 @@ -import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import { Logger } from '@aws-lambda-powertools/logger'; const logger = new Logger({ serviceName: 'kafka-consumer' }); diff --git a/examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml b/examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml new file mode 100644 index 0000000000..8c9cf98b32 --- /dev/null +++ b/examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml @@ -0,0 +1,21 @@ +Resources: +OrderProcessingFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + Runtime: python3.9 + Events: + KafkaEvent: + Type: MSK + Properties: + Stream: !GetAtt OrdersMSKCluster.Arn + Topics: + - order-events + - payment-events + # Configuration for optimal throughput/latency balance + BatchSize: 100 + MaximumBatchingWindowInSeconds: 5 + StartingPosition: LATEST + # Enable partial batch success reporting + FunctionResponseTypes: + - ReportBatchItemFailures \ No newline at end of file diff --git a/packages/kafka/README.md b/packages/kafka/README.md index e69de29bb2..e60682ec3a 100644 --- a/packages/kafka/README.md +++ b/packages/kafka/README.md @@ -0,0 +1,305 @@ +# Powertools for AWS Lambda (TypeScript) - Kafka Utility + +Powertools for AWS Lambda (TypeScript) is a developer toolkit to implement Serverless [best practices and increase developer velocity](https://docs.powertools.aws.dev/lambda/typescript/latest/#features). + +You can use the package in both TypeScript and JavaScript code bases. + +## Intro + +The Kafka Consumer utility transparently handles message deserialization, provides an intuitive developer experience, and integrates seamlessly with the rest of the Powertools for AWS Lambda ecosystem. + +## Usage + +To get started, depending on the schema types you want to use, install the library and the corresponding libraries: + +For JSON schemas: + +```bash +npm install @aws-lambda-powertools/kafka +``` + +For Avro schemas: + +```bash +npm install @aws-lambda-powertools/kafka avro-js +``` + +For Protobuf schemas: + +```bash +npm install @aws-lambda-powertools/kafka protobufjs +``` + +Additionally, if you want to use output parsing with [Standard Schema](https://github.com/standard-schema/standard-schema), you can install [any of the supported libraries](https://standardschema.dev/#what-schema-libraries-implement-the-spec), for example: Zod, Valibot, or ArkType. + +### Deserialization + +The Kafka consumer utility transforms raw Kafka events into an intuitive format for processing. To handle messages effectively, you'll need to configure a schema that matches your data format. + +#### JSON Schema + +```ts +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const schemaConfig = { + value: { + type: SchemaType.JSON, + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer(async (event, _context) => { + for (const { value } of event.records) { + logger.info('received value', { value }); + } +}, schemaConfig); +``` + +#### Avro Schema + +```ts +import { readFileSync } from 'node:fs'; +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const schemaConfig = { + value: { + type: SchemaType.AVRO, + schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'), + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer(async (event, _context) => { + for (const { value } of event.records) { + logger.info('received value', { value }); + } +}, schemaConfig); +``` + +#### Protobuf Schema + +```ts +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; +import { User } from './samples/user.es6.generated.js'; // protobuf generated class + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const schemaConfig = { + value: { + type: SchemaType.PROTOBUF, + schema: User, + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer(async (event, _context) => { + for (const { value } of event.records) { + logger.info('received value', { value }); + } +}, schemaConfig); +``` + +### Additional Parsing + +You can parse deserialized data using your preferred parsing library. This can help you integrate Kafka data with your domain schemas and application architecture, providing type hints, runtime parsing and validation, and advanced data transformations. + +#### Zod + +```ts +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; +import { z } from 'zod/v4'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const OrderItemSchema = z.object({ + productId: z.string(), + quantity: z.number().int().positive(), + price: z.number().positive(), +}); + +const OrderSchema = z.object({ + id: z.string(), + customerId: z.string(), + items: z.array(OrderItemSchema).min(1, 'Order must have at least one item'), + createdAt: z.iso.datetime(), + totalAmount: z.number().positive(), +}); + +const schemaConfig = { + value: { + type: SchemaType.JSON, + parserSchema: OrderSchema, + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer>( + async (event, _context) => { + for (const record of event.records) { + const { + value: { id, items }, + } = record; + logger.setCorrelationId(id); + logger.debug(`order includes ${items.length} items`); + } + }, + schemaConfig +); +``` + +#### Valibot + +```ts +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; +import * as v from 'valibot'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const OrderItemSchema = v.object({ + productId: v.string(), + quantity: v.pipe(v.number(), v.integer(), v.toMinValue(1)), + price: v.pipe(v.number(), v.integer()), +}); + +const OrderSchema = v.object({ + id: v.string(), + customerId: v.string(), + items: v.pipe( + v.array(OrderItemSchema), + v.minLength(1, 'Order must have at least one item') + ), + createdAt: v.pipe(v.string(), v.isoDateTime()), + totalAmount: v.pipe(v.number(), v.toMinValue(0)), +}); + +const schemaConfig = { + value: { + type: SchemaType.JSON, + parserSchema: OrderSchema, + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer>( + async (event, _context) => { + for (const record of event.records) { + const { + value: { id, items }, + } = record; + logger.setCorrelationId(id); + logger.debug(`order includes ${items.length} items`); + } + }, + schemaConfig +); +``` + +#### ArkType + +```ts +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; +import { type } from 'arktype'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const OrderItemSchema = type({ + productId: 'string', + quantity: 'number.integer >= 1', + price: 'number.integer', +}); + +const OrderSchema = type({ + id: 'string', + customerId: 'string', + items: OrderItemSchema.array().moreThanLength(0), + createdAt: 'string.date', + totalAmount: 'number.integer >= 0', +}); + +const schemaConfig = { + value: { + type: SchemaType.JSON, + parserSchema: OrderSchema, + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer( + async (event, _context) => { + for (const record of event.records) { + const { + value: { id, items }, + } = record; + logger.setCorrelationId(id); + logger.debug(`order includes ${items.length} items`); + } + }, + schemaConfig +); +``` + +See the [documentation](https://docs.powertools.aws.dev/lambda/typescript/latest/features/kafka) for more details on how to use the Kafka utility. + +## Contribute + +If you are interested in contributing to this project, please refer to our [Contributing Guidelines](https://github.com/aws-powertools/powertools-lambda-typescript/blob/main/CONTRIBUTING.md). + +## Roadmap + +The roadmap of Powertools for AWS Lambda (TypeScript) is driven by customers’ demand. +Help us prioritize upcoming functionalities or utilities by [upvoting existing RFCs and feature requests](https://github.com/aws-powertools/powertools-lambda-typescript/issues), or [creating new ones](https://github.com/aws-powertools/powertools-lambda-typescript/issues/new/choose), in this GitHub repository. + +## Connect + +* **Powertools for AWS Lambda on Discord**: `#typescript` - **[Invite link](https://discord.gg/B8zZKbbyET)** +* **Email**: + +## How to support Powertools for AWS Lambda (TypeScript)? + +### Becoming a reference customer + +Knowing which companies are using this library is important to help prioritize the project internally. If your company is using Powertools for AWS Lambda (TypeScript), you can request to have your name and logo added to the README file by raising a [Support Powertools for AWS Lambda (TypeScript) (become a reference)](https://s12d.com/become-reference-pt-ts) issue. + +The following companies, among others, use Powertools: + +* [Alma Media](https://www.almamedia.fi) +* [AppYourself](https://appyourself.net) +* [Bailey Nelson](https://www.baileynelson.com.au) +* [Banxware](https://www.banxware.com) +* [Caylent](https://caylent.com/) +* [Certible](https://www.certible.com/) +* [Elva](https://elva-group.com) +* [Flyweight](https://flyweight.io/) +* [globaldatanet](https://globaldatanet.com/) +* [Guild](https://guild.com) +* [Hashnode](https://hashnode.com/) +* [Instil](https://instil.co/) +* [LocalStack](https://localstack.cloud/) +* [Ours Privacy](https://oursprivacy.com/) +* [Perfect Post](https://www.perfectpost.fr) +* [Sennder](https://sennder.com/) +* [tecRacer GmbH & Co. KG](https://www.tecracer.com/) +* [Trek10](https://www.trek10.com/) +* [WeSchool](https://www.weschool.com) + +### Sharing your work + +Share what you did with Powertools for AWS Lambda (TypeScript) 💞💞. Blog post, workshops, presentation, sample apps and others. Check out what the community has [already shared](https://docs.powertools.aws.dev/lambda/typescript/latest/we_made_this) about Powertools for AWS Lambda (TypeScript). + +### Using Lambda Layer + +This helps us understand who uses Powertools for AWS Lambda (TypeScript) in a non-intrusive way, and helps us gain future investments for other Powertools for AWS Lambda languages. When [using Layers](https://docs.powertools.aws.dev/lambda/typescript/latest/getting-started/lambda-layers/), you can add Powertools as a dev dependency to not impact the development process. + +## License + +This library is licensed under the MIT-0 License. See the LICENSE file. diff --git a/packages/kafka/src/consumer.ts b/packages/kafka/src/consumer.ts index b71c7a3245..10436372f2 100644 --- a/packages/kafka/src/consumer.ts +++ b/packages/kafka/src/consumer.ts @@ -5,11 +5,10 @@ import type { Context, Handler } from 'aws-lambda'; import { deserialize as deserializeJson } from './deserializer/json.js'; import { deserialize as deserializePrimitive } from './deserializer/primitive.js'; import { - KafkaConsumerAvroMissingSchemaError, KafkaConsumerDeserializationError, KafkaConsumerError, + KafkaConsumerMissingSchemaError, KafkaConsumerParserError, - KafkaConsumerProtobufMissingSchemaError, } from './errors.js'; import type { ConsumerRecord, @@ -90,7 +89,7 @@ const deserialize = ({ if (config.type === 'avro') { if (!config.schema) { - throw new KafkaConsumerAvroMissingSchemaError( + throw new KafkaConsumerMissingSchemaError( 'Schema string is required for avro deserialization' ); } @@ -98,7 +97,7 @@ const deserialize = ({ } if (config.type === 'protobuf') { if (!config.schema) { - throw new KafkaConsumerProtobufMissingSchemaError( + throw new KafkaConsumerMissingSchemaError( 'Schema string is required for protobuf deserialization' ); } @@ -213,6 +212,8 @@ const deserializeRecord = async ( return deserializeHeaders(headers); }, originalHeaders: headers, + valueSchemaMetadata, + keySchemaMetadata, }; }; diff --git a/packages/kafka/src/errors.ts b/packages/kafka/src/errors.ts index dffea255a5..0686e21517 100644 --- a/packages/kafka/src/errors.ts +++ b/packages/kafka/src/errors.ts @@ -9,16 +9,6 @@ class KafkaConsumerError extends Error { } } -/** - * Error thrown when a required Protobuf schema is missing during Kafka message consumption. - */ -class KafkaConsumerProtobufMissingSchemaError extends KafkaConsumerError { - constructor(message: string, options?: ErrorOptions) { - super(message, options); - this.name = 'KafkaConsumerProtobufMissingSchemaError'; - } -} - /** * Error thrown when deserialization of a Kafka message fails. */ @@ -32,10 +22,10 @@ class KafkaConsumerDeserializationError extends KafkaConsumerError { /** * Error thrown when a required Avro schema is missing during Kafka message consumption. */ -class KafkaConsumerAvroMissingSchemaError extends KafkaConsumerError { +class KafkaConsumerMissingSchemaError extends KafkaConsumerError { constructor(message: string, options?: ErrorOptions) { super(message, options); - this.name = 'KafkaConsumerAvroMissingSchemaError'; + this.name = 'KafkaConsumerMissingSchemaError'; } } @@ -51,8 +41,7 @@ class KafkaConsumerParserError extends KafkaConsumerError { export { KafkaConsumerError, - KafkaConsumerAvroMissingSchemaError, + KafkaConsumerMissingSchemaError, KafkaConsumerDeserializationError, - KafkaConsumerProtobufMissingSchemaError, KafkaConsumerParserError, }; diff --git a/packages/kafka/tests/unit/consumer.test.ts b/packages/kafka/tests/unit/consumer.test.ts index 3b57f52c3d..f653be5497 100644 --- a/packages/kafka/tests/unit/consumer.test.ts +++ b/packages/kafka/tests/unit/consumer.test.ts @@ -1,10 +1,7 @@ import type { Context } from 'aws-lambda'; import { describe, expect, it } from 'vitest'; -import { z } from 'zod'; -import { - KafkaConsumerAvroMissingSchemaError, - KafkaConsumerProtobufMissingSchemaError, -} from '../../src/errors.js'; +import { z } from 'zod/v4'; +import { KafkaConsumerMissingSchemaError } from '../../src/errors.js'; import { SchemaType, kafkaConsumer } from '../../src/index.js'; import type { ConsumerRecords, MSKEvent } from '../../src/types/types.js'; import { loadEvent } from '../helpers/loadEvent.js'; @@ -131,12 +128,12 @@ describe('Kafka consumer', () => { { type: SchemaType.PROTOBUF, event: structuredClone(protobufTestEvent), - error: KafkaConsumerProtobufMissingSchemaError, + error: KafkaConsumerMissingSchemaError, }, { type: SchemaType.AVRO, event: structuredClone(avroTestEvent), - error: KafkaConsumerAvroMissingSchemaError, + error: KafkaConsumerMissingSchemaError, }, ])( 'throws when schemaStr not passed for $type event', From 54e65efbd99811343b54598f484748dbf25d64d1 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Fri, 20 Jun 2025 18:06:32 +0200 Subject: [PATCH 2/3] chore: title --- docs/features/kafka.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docs/features/kafka.md b/docs/features/kafka.md index c8cfdc8802..e6cb0cb9ff 100644 --- a/docs/features/kafka.md +++ b/docs/features/kafka.md @@ -4,9 +4,6 @@ description: Utility status: new --- -???+ info "Work in progress" - This documentation page is a work in progress for an upcoming feature in Powertools for AWS Lambda. If you're seeing this page, it means the release process is underway, but the feature is not yet available on npm. Please check back soon for the final version. - The Kafka Consumer utility transparently handles message deserialization, provides an intuitive developer experience, and integrates seamlessly with the rest of the Powertools for AWS Lambda ecosystem. ```mermaid @@ -221,7 +218,7 @@ For debugging purposes, you can also access the original key, value, and headers | `valueSchemaMetadata` | Metadata about the value schema like `schemaId` and `dataFormat` | Used by `kafkaConsumer` to process Protobuf, data format validation | | `keySchemaMetadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Used by `kafkaConsumer` to process Protobuf, data format validation | -### Custom output serializers +### Additional Parsing You can parse deserialized data using your preferred parsing library. This can help you integrate Kafka data with your domain schemas and application architecture, providing type hints, runtime parsing and validation, and advanced data transformations. From b3115ed6ba6695aa44a9d20ad806deed48a1359e Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Fri, 20 Jun 2025 18:19:11 +0200 Subject: [PATCH 3/3] chore: linting --- examples/snippets/kafka/advancedBasicErrorHandling.ts | 6 ++++-- examples/snippets/kafka/advancedParserErrorHandling.ts | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/examples/snippets/kafka/advancedBasicErrorHandling.ts b/examples/snippets/kafka/advancedBasicErrorHandling.ts index e0eb9aef26..afb637b05e 100644 --- a/examples/snippets/kafka/advancedBasicErrorHandling.ts +++ b/examples/snippets/kafka/advancedBasicErrorHandling.ts @@ -3,8 +3,10 @@ declare function processRecord(record: unknown): Promise; import { readFileSync } from 'node:fs'; import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; import { KafkaConsumerDeserializationError } from '@aws-lambda-powertools/kafka/errors'; -import type { ConsumerRecord } from '@aws-lambda-powertools/kafka/types'; -import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import type { + ConsumerRecord, + SchemaConfig, +} from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; const logger = new Logger({ serviceName: 'kafka-consumer' }); diff --git a/examples/snippets/kafka/advancedParserErrorHandling.ts b/examples/snippets/kafka/advancedParserErrorHandling.ts index dfece49460..3b554c1aa0 100644 --- a/examples/snippets/kafka/advancedParserErrorHandling.ts +++ b/examples/snippets/kafka/advancedParserErrorHandling.ts @@ -3,8 +3,10 @@ declare function processRecord(record: unknown): Promise; import { readFileSync } from 'node:fs'; import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; import { KafkaConsumerParserError } from '@aws-lambda-powertools/kafka/errors'; -import type { ConsumerRecord } from '@aws-lambda-powertools/kafka/types'; -import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import type { + ConsumerRecord, + SchemaConfig, +} from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import { z } from 'zod/v4';