diff --git a/packages/batch/src/AsyncBatchProcessor.ts b/packages/batch/src/AsyncBatchProcessor.ts index 781c7f1c79..718caa1159 100644 --- a/packages/batch/src/AsyncBatchProcessor.ts +++ b/packages/batch/src/AsyncBatchProcessor.ts @@ -1,16 +1,39 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; -import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; +import type { + EventSourceType, + FailureResponse, + SuccessResponse, +} from './types'; /** - * Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB + * Asynchronously process a batch of records from SQS, Kinesis Data Streams, and DynamoDB and report partial failures + * using native responses. + * + * When processing a batch of records, this processor will handle partial failures and + * return a response object that can be used to report partial failures and avoid reprocessing + * the same records. + * + * @example + * ```typescript + * import { + * AsyncBatchProcessor, + * EventType, + * } from '@aws-lambda-powertools/batch'; + * + * const processor = new AsyncBatchProcessor(EventType.SQS); + * ``` */ class AsyncBatchProcessor extends BasePartialBatchProcessor { + /** + * Process a record asynchronously using the provided handler. + * + * @param record Record to process within the batch + */ public async asyncProcessRecord( - record: BaseRecord + record: EventSourceType ): Promise { try { - const data = this.toBatchType(record, this.eventType); - const result = await this.handler(data, this.options); + const result = await this.handler(record, this.options?.context); return this.successHandler(record, result); } catch (error) { @@ -19,11 +42,13 @@ class AsyncBatchProcessor extends BasePartialBatchProcessor { } /** - * Process a record with instance's handler - * @param record Batch record to be processed - * @returns response of success or failure + * Process a record synchronously using the provided handler. + * + * Throws an error if called on an async processor. Please use `asyncProcessRecord()` instead. */ - public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse { + public processRecord( + _record: EventSourceType + ): SuccessResponse | FailureResponse { throw new Error('Not implemented. Use asyncProcess() instead.'); } } diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index d4cfd7e9ce..2615de215d 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -4,23 +4,22 @@ import type { SQSRecord, } from 'aws-lambda'; import { BasePartialProcessor } from './BasePartialProcessor'; -import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants'; +import { DEFAULT_RESPONSE, EventType } from './constants'; import { BatchProcessingError } from './errors'; -import type { - EventSourceDataClassTypes, - PartialItemFailureResponse, - PartialItemFailures, -} from './types'; +import type { PartialItemFailureResponse, PartialItemFailures } from './types'; /** - * Process batch and partially report failed items + * Abstract class to process a batch of records and report partial failures */ abstract class BasePartialBatchProcessor extends BasePartialProcessor { - public COLLECTOR_MAPPING; - - public batchResponse: PartialItemFailureResponse; - - public eventType: keyof typeof EventType; + /** + * Response object to be used in reporting partial failures + */ + protected batchResponse: PartialItemFailureResponse; + /** + * The type of event that triggered the Lambda function + */ + private eventType: keyof typeof EventType; /** * Initializes base batch processing class @@ -29,18 +28,23 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { public constructor(eventType: keyof typeof EventType) { super(); this.eventType = eventType; - this.batchResponse = DEFAULT_RESPONSE; - this.COLLECTOR_MAPPING = { - [EventType.SQS]: () => this.collectSqsFailures(), - [EventType.KinesisDataStreams]: () => this.collectKinesisFailures(), - [EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(), - }; + this.batchResponse = { ...DEFAULT_RESPONSE }; + } + + /** + * Return response object to be used in reporting partial failures + */ + public response(): PartialItemFailureResponse { + return this.batchResponse; } /** - * Report messages to be deleted in case of partial failures + * Perfom cleanup after processing a batch of records. + * + * If the entire batch failed, throw an error. Otherwise, + * prepare the response object to be used in reporting partial failures. */ - public clean(): void { + protected clean(): void { if (!this.hasMessagesToReport()) { return; } @@ -48,27 +52,25 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { if (this.entireBatchFailed()) { throw new BatchProcessingError( 'All records failed processing. ' + - this.exceptions.length + + this.errors.length + ' individual errors logged separately below.', - this.exceptions + this.errors ); } - const messages: PartialItemFailures[] = this.getMessagesToReport(); - this.batchResponse = { batchItemFailures: messages }; + this.batchResponse = { batchItemFailures: this.getMessagesToReport() }; } /** - * Collects identifiers of failed items for a DynamoDB stream - * @returns list of identifiers for failed items + * Collect the identifiers of failed items for a DynamoDB stream. */ - public collectDynamoDBFailures(): PartialItemFailures[] { + protected collectDynamoDBFailures(): PartialItemFailures[] { const failures: PartialItemFailures[] = []; - for (const msg of this.failureMessages) { - const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber; - if (msgId) { - failures.push({ itemIdentifier: msgId }); + for (const message of this.failureMessages) { + const messageId = (message as DynamoDBRecord).dynamodb?.SequenceNumber; + if (messageId) { + failures.push({ itemIdentifier: messageId }); } } @@ -76,81 +78,66 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { } /** - * Collects identifiers of failed items for a Kinesis stream - * @returns list of identifiers for failed items + * Collect the identifiers of failed items for a Kinesis data stream. */ - public collectKinesisFailures(): PartialItemFailures[] { - const failures: PartialItemFailures[] = []; - - for (const msg of this.failureMessages) { - const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber; - failures.push({ itemIdentifier: msgId }); - } - - return failures; + protected collectKinesisFailures(): PartialItemFailures[] { + return this.failureMessages.map((message) => { + const { + kinesis: { sequenceNumber }, + } = message as KinesisStreamRecord; + + return { itemIdentifier: sequenceNumber }; + }); } /** - * Collects identifiers of failed items for an SQS batch - * @returns list of identifiers for failed items + * Collect the identifiers of failed items for a SQS queue. */ - public collectSqsFailures(): PartialItemFailures[] { - const failures: PartialItemFailures[] = []; - - for (const msg of this.failureMessages) { - const msgId = (msg as SQSRecord).messageId; - failures.push({ itemIdentifier: msgId }); - } + protected collectSqsFailures(): PartialItemFailures[] { + return this.failureMessages.map((message) => { + const { messageId } = message as SQSRecord; - return failures; + return { itemIdentifier: messageId }; + }); } /** - * Determines whether all records in a batch failed to process - * @returns true if all records resulted in exception results + * Determine whether the entire batch failed to be processed. */ - public entireBatchFailed(): boolean { - return this.exceptions.length == this.records.length; + protected entireBatchFailed(): boolean { + return this.errors.length === this.records.length; } /** - * Collects identifiers for failed batch items - * @returns formatted messages to use in batch deletion + * Collect all failed messages and returns them as a list of partial failures + * according to the event type. */ - public getMessagesToReport(): PartialItemFailures[] { - return this.COLLECTOR_MAPPING[this.eventType](); + protected getMessagesToReport(): PartialItemFailures[] { + switch (this.eventType) { + case EventType.SQS: + return this.collectSqsFailures(); + case EventType.KinesisDataStreams: + return this.collectKinesisFailures(); + case EventType.DynamoDBStreams: + return this.collectDynamoDBFailures(); + } } /** - * Determines if any records failed to process - * @returns true if any records resulted in exception + * Determine whether there are any failed messages to report as partial failures. */ - public hasMessagesToReport(): boolean { + protected hasMessagesToReport(): boolean { return this.failureMessages.length != 0; } /** - * Remove results from previous execution + * Prepare class instance for processing a new batch of records. */ - public prepare(): void { + protected prepare(): void { this.successMessages.length = 0; this.failureMessages.length = 0; - this.exceptions.length = 0; - this.batchResponse = DEFAULT_RESPONSE; - } - - /** - * @returns Batch items that failed processing, if any - */ - public response(): PartialItemFailureResponse { - return this.batchResponse; - } - - public toBatchType( - record: EventSourceDataClassTypes, - eventType: keyof typeof EventType - ): SQSRecord | KinesisStreamRecord | DynamoDBRecord { - return DATA_CLASS_MAPPING[eventType](record); + this.errors.length = 0; + this.batchResponse = { ...DEFAULT_RESPONSE }; } } diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index ecd62c29b0..9611c2c91a 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -1,42 +1,57 @@ import type { - BaseRecord, BatchProcessingOptions, - EventSourceDataClassTypes, + EventSourceType, FailureResponse, - ResultType, SuccessResponse, } from './types'; +import { EventType } from './constants'; /** * Abstract class for batch processors. + * + * The class provides a common interface for processing batches of records. */ abstract class BasePartialProcessor { - public exceptions: Error[]; - - public failureMessages: EventSourceDataClassTypes[]; - - public handler: CallableFunction; - - public options?: BatchProcessingOptions; - - public records: BaseRecord[]; - - public successMessages: EventSourceDataClassTypes[]; - /** - * Initializes base processor class + * List of records that failed processing with their corresponding error + */ + public failureMessages: EventSourceType[]; + /** + * List of records that succeeded processing with their corresponding result + */ + public successMessages: EventSourceType[]; + /** + * List of errors that were thrown during processing + */ + protected errors: Error[]; + /** + * Handler used to process records + */ + protected handler: CallableFunction; + /** + * Options passed to the handler */ + protected options?: BatchProcessingOptions; + /** + * List of records to be processed + */ + protected records: EventSourceType[]; + public constructor() { this.successMessages = []; this.failureMessages = []; - this.exceptions = []; + this.errors = []; this.records = []; this.handler = new Function(); } /** - * Call instance's handler for each record - * @returns List of processed records + * Process a batch of records asynchronously. + * + * When processing a batch of records, this processor will call the + * handler on each record and then store the promise returned by each call. + * + * It then waits for all promises to resolve and returns a list of results. */ public async asyncProcess(): Promise<(SuccessResponse | FailureResponse)[]> { /** @@ -60,44 +75,19 @@ abstract class BasePartialProcessor { } /** - * Process a record with an asyncronous handler + * Abstract method to process a record asynchronously. * * @param record Record to be processed */ public abstract asyncProcessRecord( - record: BaseRecord + record: EventSourceType ): Promise; /** - * Clean class instance after processing - */ - public abstract clean(): void; - - /** - * Keeps track of batch records that failed processing - * @param record record that failed processing - * @param exception exception that was thrown - * @returns FailureResponse object with ["fail", exception, original record] - */ - public failureHandler( - record: EventSourceDataClassTypes, - exception: Error - ): FailureResponse { - const entry: FailureResponse = ['fail', exception.message, record]; - this.exceptions.push(exception); - this.failureMessages.push(record); - - return entry; - } - - /** - * Prepare class instance before processing - */ - public abstract prepare(): void; - - /** - * Call instance's handler for each record - * @returns List of processed records + * Process a batch of records synchronously. + * + * When processing a batch of records, this processor will call the + * handler on each record sequentially and then return a list of results. */ public process(): (SuccessResponse | FailureResponse)[] { /** @@ -120,43 +110,82 @@ abstract class BasePartialProcessor { } /** - * Process a record with the handler + * Abstract method to process a record synchronously. + * * @param record Record to be processed */ public abstract processRecord( - record: BaseRecord + record: EventSourceType ): SuccessResponse | FailureResponse; /** - * Set class instance attributes before execution - * @param records List of records to be processed - * @param handler CallableFunction to process entries of "records" - * @returns this object + * Register a batch of records to be processed and the handler to process them. + * + * @param records Batch of records to be processed + * @param handler Function to process each record + * @param options Options to be passed to the handler, such as the AWS Lambda context */ public register( - records: BaseRecord[], + records: EventSourceType[], handler: CallableFunction, options?: BatchProcessingOptions - ): BasePartialProcessor { + ): void { + if (!records) { + const eventTypes: string = Object.values(EventType).toString(); + throw new Error( + 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + + eventTypes + + ' event.' + ); + } + this.records = records; this.handler = handler; + this.options = options; + } - if (options) { - this.options = options; - } + /** + * Abstract method to clean up class instance after processing. + * + * This method is called once, after processing the batch. + * It can be used to clean up state or perform any side effects. + */ + protected abstract clean(): void; + + /** + * Store the result of a failed record processing + * + * @param record Record that was processed + * @param error Error thrown by record handler + */ + protected failureHandler( + record: EventSourceType, + error: Error + ): FailureResponse { + const entry: FailureResponse = ['fail', error.message, record]; + this.errors.push(error); + this.failureMessages.push(record); - return this; + return entry; } /** - * Keeps track of batch records that were processed successfully - * @param record record that succeeded processing - * @param result result from record handler - * @returns SuccessResponse object with ["success", result, original record] + * Abstract method to prepare class instance before processing. + * + * This method is called once, before processing the batch. + * It can be used to initialize state or perform any side effects. + */ + protected abstract prepare(): void; + + /** + * Store the result of a successful record processing + * + * @param record Record that was processed + * @param result Result returned by record handler */ - public successHandler( - record: EventSourceDataClassTypes, - result: ResultType + protected successHandler( + record: EventSourceType, + result: unknown ): SuccessResponse { const entry: SuccessResponse = ['success', result, record]; this.successMessages.push(record); diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index 3d2a75a8da..473e6a1081 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -1,25 +1,50 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; -import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; +import type { + EventSourceType, + FailureResponse, + SuccessResponse, +} from './types'; /** - * Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB + * Synchronously process a batch of records from SQS, Kinesis Data Streams, and DynamoDB and report partial failures + * using native responses. + * + * When processing a batch of records, this processor will handle partial failures and + * return a response object that can be used to report partial failures and avoid reprocessing + * the same records. + * + * @example + * ```typescript + * import { + * BatchProcessor, + * EventType, + * } from '@aws-lambda-powertools/batch'; + * + * const processor = new BatchProcessor(EventType.SQS); + * ``` */ class BatchProcessor extends BasePartialBatchProcessor { + /** + * Process a record asynchronously using the provided handler. + * + * Throws an error if called on an async processor. Please use `processRecord()` instead. + */ public async asyncProcessRecord( - _record: BaseRecord + _record: EventSourceType ): Promise { throw new Error('Not implemented. Use process() instead.'); } /** - * Process a record with instance's handler - * @param record Batch record to be processed - * @returns response of success or failure + * Process a record synchronously using the provided handler. + * + * @param record Record to process within the batch */ - public processRecord(record: BaseRecord): SuccessResponse | FailureResponse { + public processRecord( + record: EventSourceType + ): SuccessResponse | FailureResponse { try { - const data = this.toBatchType(record, this.eventType); - const result = this.handler(data, this.options); + const result = this.handler(record, this.options?.context); return this.successHandler(record, result); } catch (error) { diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 0c10993273..4ea60e0f08 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -3,9 +3,19 @@ import { EventType } from './constants'; import type { FailureResponse, SuccessResponse } from './types'; /** - * Process native partial responses from SQS FIFO queues - * Stops processing records when the first record fails - * The remaining records are reported as failed items + * Process a batch of records from SQS FIFO queues and report partial failures using native responses. + * + * When processing a batch of records, this processor will mark records as failed when the first failure is detected. + * This is done to preserve the order of messages in the FIFO queue. + * + * @example + * ```typescript + * import { + * SqsFifoPartialProcessor, + * } from '@aws-lambda-powertools/batch'; + * + * const processor = new SqsFifoPartialProcessor(); + * ``` */ class SqsFifoPartialProcessor extends BatchProcessor { public constructor() { @@ -13,24 +23,22 @@ class SqsFifoPartialProcessor extends BatchProcessor { } /** - * Call instance's handler for each record. - * When the first failed message is detected, the process is short-circuited - * And the remaining messages are reported as failed items + * Process a batch of records synchronously. + * + * Since this is a FIFO processor, it will stop processing records when the first record fails. */ public process(): (SuccessResponse | FailureResponse)[] { this.prepare(); const processedRecords: (SuccessResponse | FailureResponse)[] = []; - let currentIndex = 0; - for (const record of this.records) { + for (const [index, record] of this.records.entries()) { // If we have any failed messages, it means the last message failed // We should then short circuit the process and fail remaining messages - if (this.failureMessages.length != 0) { - return this.shortCircuitProcessing(currentIndex, processedRecords); + if (this.failureMessages.length > 0) { + return this.shortCircuitProcessing(index, processedRecords); } processedRecords.push(this.processRecord(record)); - currentIndex++; } this.clean(); @@ -39,9 +47,11 @@ class SqsFifoPartialProcessor extends BatchProcessor { } /** - * Starting from the first failure index, fail all remaining messages and append them to the result list - * @param firstFailureIndex Index of first message that failed - * @param result List of success and failure responses with remaining messages failed + * Short circuit processing of remaining messages when the first failure is detected. + * + * Starting from the index of the first failure, mark all remaining messages as failed. + * + * @param firstFailureIndex Index of the first failed message */ public shortCircuitProcessing( firstFailureIndex: number, @@ -50,10 +60,9 @@ class SqsFifoPartialProcessor extends BatchProcessor { const remainingRecords = this.records.slice(firstFailureIndex); for (const record of remainingRecords) { - const data = this.toBatchType(record, this.eventType); processedRecords.push( this.failureHandler( - data, + record, new Error('A previous record failed processing') ) ); diff --git a/packages/batch/src/asyncProcessPartialResponse.ts b/packages/batch/src/asyncProcessPartialResponse.ts index eee584ed1f..8499735b61 100644 --- a/packages/batch/src/asyncProcessPartialResponse.ts +++ b/packages/batch/src/asyncProcessPartialResponse.ts @@ -1,33 +1,60 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; -import { EventType } from './constants'; import type { - BaseRecord, + EventSourceType, BatchProcessingOptions, PartialItemFailureResponse, } from './types'; /** - * Higher level function to handle batch event processing - * @param event Lambda's original event - * @param recordHandler Callable function to process each record from the batch - * @param processor Batch processor to handle partial failure cases - * @returns Lambda Partial Batch Response + * Higher level function to handle batch event processing using an asynchronous handler. + * + * This function is used to process a batch of records and report partial failures from SQS, Kinesis Data Streams, and DynamoDB. + * It returns a response object that can be used to report partial failures and avoid reprocessing the same records. + * + * @example + * ```typescript + * import { + * BatchProcessor, + * EventType, + * asyncProcessPartialResponse, + * } from '@aws-lambda-powertools/batch'; + * import type { + * SQSEvent, + * SQSRecord, + * SQSBatchResponse, + * Context, + * } from 'aws-lambda'; + * + * const processor = new BatchProcessor(EventType.SQS); + * + * const recordHandler = async (_record: SQSRecord): Promise => { + * // your record processing logic + * }; + * + * export const handler = async ( + * event: SQSEvent, + * context: Context + * ): Promise => { + * return await asyncProcessPartialResponse( + * event, + * recordHandler, + * processor, + * { context }, + * ); + * }; + * ``` + * + * @param event Original event from AWS Lambda containing batch of records + * @param recordHandler Asynchronous function to process each record in the batch + * @param processor Batch processor instance to handle partial failure cases + * @param options Optional batch processing options, such as context */ const asyncProcessPartialResponse = async ( - event: { Records: BaseRecord[] }, + event: { Records: EventSourceType[] }, recordHandler: CallableFunction, processor: BasePartialBatchProcessor, options?: BatchProcessingOptions ): Promise => { - if (!event.Records) { - const eventTypes: string = Object.values(EventType).toString(); - throw new Error( - 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + - eventTypes + - ' event.' - ); - } - processor.register(event.Records, recordHandler, options); await processor.asyncProcess(); diff --git a/packages/batch/src/constants.ts b/packages/batch/src/constants.ts index 02437e356c..49fe6917cf 100644 --- a/packages/batch/src/constants.ts +++ b/packages/batch/src/constants.ts @@ -1,25 +1,40 @@ -import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; -import type { - PartialItemFailureResponse, - EventSourceDataClassTypes, -} from './types'; +import type { PartialItemFailureResponse } from './types'; +/** + * Event types supported by the Batch Processing utility. + * + * @example + * ```typescript + * import { BatchProcessor, EventType } from '@aws-lambda-powertools/batch'; + * + * const sqsProcessor = new BatchProcessor(EventType.SQS); + * const kinesisProcessor = new BatchProcessor(EventType.KinesisDataStreams); + * const dynamoDBStreamProcessor = new BatchProcessor(EventType.DynamoDBStreams); + * ``` + */ const EventType = { SQS: 'SQS', KinesisDataStreams: 'KinesisDataStreams', DynamoDBStreams: 'DynamoDBStreams', } as const; +/** + * @internal + * + * Result types supported by the Batch Processing utility. + */ +const ResultType = { + Success: 'success', + Failure: 'fail', +} as const; + +/** + * @internal + * + * Default response for batch processing failures. + */ const DEFAULT_RESPONSE: PartialItemFailureResponse = { batchItemFailures: [], }; -const DATA_CLASS_MAPPING = { - [EventType.SQS]: (record: EventSourceDataClassTypes) => record as SQSRecord, - [EventType.KinesisDataStreams]: (record: EventSourceDataClassTypes) => - record as KinesisStreamRecord, - [EventType.DynamoDBStreams]: (record: EventSourceDataClassTypes) => - record as DynamoDBRecord, -}; - -export { EventType, DEFAULT_RESPONSE, DATA_CLASS_MAPPING }; +export { EventType, ResultType, DEFAULT_RESPONSE }; diff --git a/packages/batch/src/errors.ts b/packages/batch/src/errors.ts index ed5bd4fc9e..19133727b2 100644 --- a/packages/batch/src/errors.ts +++ b/packages/batch/src/errors.ts @@ -1,15 +1,13 @@ /** - * Base error type for batch processing + * Base error type for the Batch Processing utility. + * * All errors thrown by major failures extend this base class */ class BaseBatchProcessingError extends Error { public childErrors: Error[]; - public msg: string; - - public constructor(msg: string, childErrors: Error[]) { - super(msg); - this.msg = msg; + public constructor(message: string, childErrors: Error[]) { + super(message); this.childErrors = childErrors; } @@ -17,14 +15,15 @@ class BaseBatchProcessingError extends Error { * Generates a list of errors that were generated by the major failure * @returns Formatted string listing all the errors that occurred * - * @example * When all batch records fail to be processed, this will generate a string like: + * + * @example * All records failed processing. 3 individual errors logged separately below. * ,Failed to process record. * ,Failed to process record. * ,Failed to process record. */ - public formatErrors(parentErrorString: string): string { + protected formatErrors(parentErrorString: string): string { const errorList: string[] = [parentErrorString + '\n']; for (const error of this.childErrors) { @@ -36,11 +35,11 @@ class BaseBatchProcessingError extends Error { } /** - * When all batch records failed to be processed + * Error thrown when all records in a batch fail to be processed. */ class BatchProcessingError extends BaseBatchProcessingError { - public constructor(msg: string, childErrors: Error[]) { - super(msg, childErrors); + public constructor(message: string, childErrors: Error[]) { + super(message, childErrors); const parentErrorString: string = this.message; this.message = this.formatErrors(parentErrorString); } diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index d09e7be6b9..c4736ceb24 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -1,33 +1,55 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; -import { EventType } from './constants'; import type { - BaseRecord, + EventSourceType, BatchProcessingOptions, PartialItemFailureResponse, } from './types'; /** - * Higher level function to handle batch event processing - * @param event Lambda's original event - * @param recordHandler Callable function to process each record from the batch - * @param processor Batch processor to handle partial failure cases - * @returns Lambda Partial Batch Response + * Higher level function to handle batch event processing using a synchronous handler. + * + * This function is used to process a batch of records and report partial failures from SQS, Kinesis Data Streams, and DynamoDB. + * It returns a response object that can be used to report partial failures and avoid reprocessing the same records. + * + * @example + * ```typescript + * import { + * BatchProcessor, + * EventType, + * processPartialResponse, + * } from '@aws-lambda-powertools/batch'; + * import type { + * SQSEvent, + * SQSRecord, + * SQSBatchResponse, + * Context, + * } from 'aws-lambda'; + * + * const processor = new BatchProcessor(EventType.SQS); + * + * const recordHandler = (_record: SQSRecord): void => { + * // your record processing logic + * }; + * + * export const handler = async ( + * event: SQSEvent, + * context: Context + * ): Promise => { + * return processPartialResponse(event, recordHandler, processor, { context }); + * }; + * ``` + * + * @param event Original event from AWS Lambda containing batch of records + * @param recordHandler Synchronous function to process each record in the batch + * @param processor Batch processor instance to handle partial failure cases + * @param options Optional batch processing options, such as context */ const processPartialResponse = ( - event: { Records: BaseRecord[] }, + event: { Records: EventSourceType[] }, recordHandler: CallableFunction, processor: BasePartialBatchProcessor, options?: BatchProcessingOptions ): PartialItemFailureResponse => { - if (!event.Records) { - const eventTypes: string = Object.values(EventType).toString(); - throw new Error( - 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + - eventTypes + - ' event.' - ); - } - processor.register(event.Records, recordHandler, options); processor.process(); diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 17ce3633c7..ae7027c542 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -1,34 +1,55 @@ -import { +import type { Context, DynamoDBRecord, KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; +import { ResultType } from './constants'; +/** + * Additional options for batch processing. + * + * Useful for passing the AWS Lambda context object to the handler. + */ type BatchProcessingOptions = { + /** + * The AWS Lambda context object. + * @see https://docs.aws.amazon.com/lambda/latest/dg/nodejs-context.html + */ context: Context; }; -type EventSourceDataClassTypes = - | SQSRecord - | KinesisStreamRecord - | DynamoDBRecord; +/** + * The type of event that triggered the Lambda function. + * @example 'SQSRecord', 'DynamoDBRecord', 'KinesisStreamRecord' + */ +type EventSourceType = SQSRecord | KinesisStreamRecord | DynamoDBRecord; -type RecordValue = unknown; -type BaseRecord = { [key: string]: RecordValue } | EventSourceDataClassTypes; - -type ResultType = unknown; -type SuccessResponse = ['success', ResultType, EventSourceDataClassTypes]; - -type FailureResponse = ['fail', string, EventSourceDataClassTypes]; +/** + * Successful response from processing a record in a batch. + * + * It contains the result of the handler and the original record. + */ +type SuccessResponse = readonly ['success', unknown, EventSourceType]; +/** + * Failed response from processing a record in a batch. + * + * It contains the error message and the original record. + */ +type FailureResponse = readonly ['fail', string, EventSourceType]; +/** + * Type that identifies a partial failure response. + */ type PartialItemFailures = { itemIdentifier: string }; +/** + * Response object to be used in reporting partial failures. + */ type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; export type { BatchProcessingOptions, - BaseRecord, - EventSourceDataClassTypes, + EventSourceType, ResultType, SuccessResponse, FailureResponse, diff --git a/packages/batch/tests/helpers/factories.ts b/packages/batch/tests/helpers/factories.ts index 7df6110742..ae2673540e 100644 --- a/packages/batch/tests/helpers/factories.ts +++ b/packages/batch/tests/helpers/factories.ts @@ -5,6 +5,13 @@ import type { } from 'aws-lambda'; import { randomInt, randomUUID } from 'node:crypto'; +/** + * Factory function for creating SQS records. + * + * Used for testing purposes. + * + * @param body The body of the record. + */ const sqsRecordFactory = (body: string): SQSRecord => { return { messageId: randomUUID(), @@ -24,23 +31,30 @@ const sqsRecordFactory = (body: string): SQSRecord => { }; }; +/** + * Factory function for creating Kinesis records. + * + * Used for testing purposes. + * + * @param body The body of the record. + */ const kinesisRecordFactory = (body: string): KinesisStreamRecord => { - let seq = ''; + let sequenceNumber = ''; for (let i = 0; i < 52; i++) { - seq = seq + randomInt(10); + sequenceNumber = sequenceNumber + randomInt(10); } return { kinesis: { kinesisSchemaVersion: '1.0', partitionKey: '1', - sequenceNumber: seq, + sequenceNumber, data: body, approximateArrivalTimestamp: 1545084650.987, }, eventSource: 'aws:kinesis', eventVersion: '1.0', - eventID: 'shardId-000000000006:' + seq, + eventID: 'shardId-000000000006:' + sequenceNumber, eventName: 'aws:kinesis:record', invokeIdentityArn: 'arn:aws:iam::123456789012:role/lambda-role', awsRegion: 'us-east-2', @@ -49,10 +63,17 @@ const kinesisRecordFactory = (body: string): KinesisStreamRecord => { }; }; +/** + * Factory function for creating DynamoDB Stream records. + * + * Used for testing purposes. + * + * @param body The body of the record. + */ const dynamodbRecordFactory = (body: string): DynamoDBRecord => { - let seq = ''; + let sequenceNumber = ''; for (let i = 0; i < 10; i++) { - seq = seq + randomInt(10); + sequenceNumber = sequenceNumber + randomInt(10); } return { @@ -62,7 +83,7 @@ const dynamodbRecordFactory = (body: string): DynamoDBRecord => { Keys: { Id: { N: '101' } }, NewImage: { Message: { S: body } }, StreamViewType: 'NEW_AND_OLD_IMAGES', - SequenceNumber: seq, + SequenceNumber: sequenceNumber, SizeBytes: 26, }, awsRegion: 'us-west-2', diff --git a/packages/batch/tests/helpers/handlers.ts b/packages/batch/tests/helpers/handlers.ts index 3a6d17b76a..c34321a381 100644 --- a/packages/batch/tests/helpers/handlers.ts +++ b/packages/batch/tests/helpers/handlers.ts @@ -1,10 +1,17 @@ +import type { Context } from 'aws-lambda'; import type { DynamoDBRecord, KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; -import type { BatchProcessingOptions } from '../../src/types'; +/** + * Test handler for SQS records. + * + * Used for testing the SQS batch processor with a synchronous handler. + * + * @param record The SQS record to process. + */ const sqsRecordHandler = (record: SQSRecord): string => { const body = record.body; if (body.includes('fail')) { @@ -14,15 +21,29 @@ const sqsRecordHandler = (record: SQSRecord): string => { return body; }; +/** + * Test handler for SQS records. + * + * Used for testing the SQS batch processor with an asynchronous handler. + * + * @param record The SQS record to process. + */ const asyncSqsRecordHandler = async (record: SQSRecord): Promise => { const body = record.body; if (body.includes('fail')) { throw Error('Failed to process record.'); } - return body; + return Promise.resolve(body); }; +/** + * Test handler for Kinesis records. + * + * Used for testing the Kinesis batch processor with a synchronous handler. + * + * @param record The Kinesis record to process. + */ const kinesisRecordHandler = (record: KinesisStreamRecord): string => { const body = record.kinesis.data; if (body.includes('fail')) { @@ -32,6 +53,13 @@ const kinesisRecordHandler = (record: KinesisStreamRecord): string => { return body; }; +/** + * Test handler for Kinesis records. + * + * Used for testing the Kinesis batch processor with an asynchronous handler. + * + * @param record The Kinesis record to process. + */ const asyncKinesisRecordHandler = async ( record: KinesisStreamRecord ): Promise => { @@ -40,9 +68,16 @@ const asyncKinesisRecordHandler = async ( throw Error('Failed to process record.'); } - return body; + return Promise.resolve(body); }; +/** + * Test handler for DynamoDB records. + * + * Used for testing the DynamoDB batch processor with a synchronous handler. + * + * @param record The DynamoDB record to process. + */ const dynamodbRecordHandler = (record: DynamoDBRecord): object => { const body = record.dynamodb?.NewImage?.Message || { S: 'fail' }; if (body['S']?.includes('fail')) { @@ -52,6 +87,13 @@ const dynamodbRecordHandler = (record: DynamoDBRecord): object => { return body; }; +/** + * Test handler for DynamoDB records. + * + * Used for testing the DynamoDB batch processor with an asynchronous handler. + * + * @param record The DynamoDB record to process. + */ const asyncDynamodbRecordHandler = async ( record: DynamoDBRecord ): Promise => { @@ -60,15 +102,19 @@ const asyncDynamodbRecordHandler = async ( throw Error('Failed to process record.'); } - return body; + return Promise.resolve(body); }; -const handlerWithContext = ( - record: SQSRecord, - options: BatchProcessingOptions -): string => { - const context = options.context; - +/** + * Test handler for SQS records. + * + * Used for testing the SQS batch processor with a synchronous handler, it + * also tests that the context object is passed to the handler. + * + * @param record The SQS record to process. + * @param context The AWS Lambda context object passed to the handler. + */ +const handlerWithContext = (record: SQSRecord, context: Context): string => { try { if (context.getRemainingTimeInMillis() == 0) { throw Error('No time remaining.'); @@ -80,12 +126,19 @@ const handlerWithContext = ( return record.body; }; +/** + * Test handler for SQS records. + * + * Used for testing the SQS batch processor with a asynchronous handler, it + * also tests that the context object is passed to the handler. + * + * @param record The SQS record to process. + * @param context The AWS Lambda context object passed to the handler. + */ const asyncHandlerWithContext = async ( record: SQSRecord, - options: BatchProcessingOptions + context: Context ): Promise => { - const context = options.context; - try { if (context.getRemainingTimeInMillis() == 0) { throw Error('No time remaining.');