Skip to content

docs(batch): create API docs for the utility #1621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 34 additions & 9 deletions packages/batch/src/AsyncBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,39 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import type { BaseRecord, FailureResponse, SuccessResponse } from './types';
import type {
EventSourceType,
FailureResponse,
SuccessResponse,
} from './types';

/**
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB
* Asynchronously process a batch of records from SQS, Kinesis Data Streams, and DynamoDB and report partial failures
* using native responses.
*
* When processing a batch of records, this processor will handle partial failures and
* return a response object that can be used to report partial failures and avoid reprocessing
* the same records.
*
* @example
* ```typescript
* import {
* AsyncBatchProcessor,
* EventType,
* } from '@aws-lambda-powertools/batch';
*
* const processor = new AsyncBatchProcessor(EventType.SQS);
* ```
*/
class AsyncBatchProcessor extends BasePartialBatchProcessor {
/**
* Process a record asynchronously using the provided handler.
*
* @param record Record to process within the batch
*/
public async asyncProcessRecord(
record: BaseRecord
record: EventSourceType
): Promise<SuccessResponse | FailureResponse> {
try {
const data = this.toBatchType(record, this.eventType);
const result = await this.handler(data, this.options);
const result = await this.handler(record, this.options?.context);

return this.successHandler(record, result);
} catch (error) {
Expand All @@ -19,11 +42,13 @@ class AsyncBatchProcessor extends BasePartialBatchProcessor {
}

/**
* Process a record with instance's handler
* @param record Batch record to be processed
* @returns response of success or failure
* Process a record synchronously using the provided handler.
*
* Throws an error if called on an async processor. Please use `asyncProcessRecord()` instead.
*/
public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse {
public processRecord(
_record: EventSourceType
): SuccessResponse | FailureResponse {
throw new Error('Not implemented. Use asyncProcess() instead.');
}
}
Expand Down
149 changes: 68 additions & 81 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@ import type {
SQSRecord,
} from 'aws-lambda';
import { BasePartialProcessor } from './BasePartialProcessor';
import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants';
import { DEFAULT_RESPONSE, EventType } from './constants';
import { BatchProcessingError } from './errors';
import type {
EventSourceDataClassTypes,
PartialItemFailureResponse,
PartialItemFailures,
} from './types';
import type { PartialItemFailureResponse, PartialItemFailures } from './types';

/**
* Process batch and partially report failed items
* Abstract class to process a batch of records and report partial failures
*/
abstract class BasePartialBatchProcessor extends BasePartialProcessor {
public COLLECTOR_MAPPING;

public batchResponse: PartialItemFailureResponse;

public eventType: keyof typeof EventType;
/**
* Response object to be used in reporting partial failures
*/
protected batchResponse: PartialItemFailureResponse;
/**
* The type of event that triggered the Lambda function
*/
private eventType: keyof typeof EventType;

/**
* Initializes base batch processing class
Expand All @@ -29,128 +28,116 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
public constructor(eventType: keyof typeof EventType) {
super();
this.eventType = eventType;
this.batchResponse = DEFAULT_RESPONSE;
this.COLLECTOR_MAPPING = {
[EventType.SQS]: () => this.collectSqsFailures(),
[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(),
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(),
};
this.batchResponse = { ...DEFAULT_RESPONSE };
}

/**
* Return response object to be used in reporting partial failures
*/
public response(): PartialItemFailureResponse {
return this.batchResponse;
}

/**
* Report messages to be deleted in case of partial failures
* Perfom cleanup after processing a batch of records.
*
* If the entire batch failed, throw an error. Otherwise,
* prepare the response object to be used in reporting partial failures.
*/
public clean(): void {
protected clean(): void {
if (!this.hasMessagesToReport()) {
return;
}

if (this.entireBatchFailed()) {
throw new BatchProcessingError(
'All records failed processing. ' +
this.exceptions.length +
this.errors.length +
' individual errors logged separately below.',
this.exceptions
this.errors
);
}

const messages: PartialItemFailures[] = this.getMessagesToReport();
this.batchResponse = { batchItemFailures: messages };
this.batchResponse = { batchItemFailures: this.getMessagesToReport() };
}

/**
* Collects identifiers of failed items for a DynamoDB stream
* @returns list of identifiers for failed items
* Collect the identifiers of failed items for a DynamoDB stream.
*/
public collectDynamoDBFailures(): PartialItemFailures[] {
protected collectDynamoDBFailures(): PartialItemFailures[] {
const failures: PartialItemFailures[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber;
if (msgId) {
failures.push({ itemIdentifier: msgId });
for (const message of this.failureMessages) {
const messageId = (message as DynamoDBRecord).dynamodb?.SequenceNumber;
if (messageId) {
failures.push({ itemIdentifier: messageId });
}
}

return failures;
}

/**
* Collects identifiers of failed items for a Kinesis stream
* @returns list of identifiers for failed items
* Collect the identifiers of failed items for a Kinesis data stream.
*/
public collectKinesisFailures(): PartialItemFailures[] {
const failures: PartialItemFailures[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber;
failures.push({ itemIdentifier: msgId });
}

return failures;
protected collectKinesisFailures(): PartialItemFailures[] {
return this.failureMessages.map((message) => {
const {
kinesis: { sequenceNumber },
} = message as KinesisStreamRecord;

return { itemIdentifier: sequenceNumber };
});
}

/**
* Collects identifiers of failed items for an SQS batch
* @returns list of identifiers for failed items
* Collect the identifiers of failed items for a SQS queue.
*/
public collectSqsFailures(): PartialItemFailures[] {
const failures: PartialItemFailures[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as SQSRecord).messageId;
failures.push({ itemIdentifier: msgId });
}
protected collectSqsFailures(): PartialItemFailures[] {
return this.failureMessages.map((message) => {
const { messageId } = message as SQSRecord;

return failures;
return { itemIdentifier: messageId };
});
}

/**
* Determines whether all records in a batch failed to process
* @returns true if all records resulted in exception results
* Determine whether the entire batch failed to be processed.
*/
public entireBatchFailed(): boolean {
return this.exceptions.length == this.records.length;
protected entireBatchFailed(): boolean {
return this.errors.length === this.records.length;
}

/**
* Collects identifiers for failed batch items
* @returns formatted messages to use in batch deletion
* Collect all failed messages and returns them as a list of partial failures
* according to the event type.
*/
public getMessagesToReport(): PartialItemFailures[] {
return this.COLLECTOR_MAPPING[this.eventType]();
protected getMessagesToReport(): PartialItemFailures[] {
switch (this.eventType) {
case EventType.SQS:
return this.collectSqsFailures();
case EventType.KinesisDataStreams:
return this.collectKinesisFailures();
case EventType.DynamoDBStreams:
return this.collectDynamoDBFailures();
}
}

/**
* Determines if any records failed to process
* @returns true if any records resulted in exception
* Determine whether there are any failed messages to report as partial failures.
*/
public hasMessagesToReport(): boolean {
protected hasMessagesToReport(): boolean {
return this.failureMessages.length != 0;
}

/**
* Remove results from previous execution
* Prepare class instance for processing a new batch of records.
*/
public prepare(): void {
protected prepare(): void {
this.successMessages.length = 0;
this.failureMessages.length = 0;
this.exceptions.length = 0;
this.batchResponse = DEFAULT_RESPONSE;
}

/**
* @returns Batch items that failed processing, if any
*/
public response(): PartialItemFailureResponse {
return this.batchResponse;
}

public toBatchType(
record: EventSourceDataClassTypes,
eventType: keyof typeof EventType
): SQSRecord | KinesisStreamRecord | DynamoDBRecord {
return DATA_CLASS_MAPPING[eventType](record);
this.errors.length = 0;
this.batchResponse = { ...DEFAULT_RESPONSE };
}
}

Expand Down
Loading