@@ -4,23 +4,22 @@ import type {
4
4
SQSRecord ,
5
5
} from 'aws-lambda' ;
6
6
import { BasePartialProcessor } from './BasePartialProcessor' ;
7
- import { DATA_CLASS_MAPPING , DEFAULT_RESPONSE , EventType } from './constants' ;
7
+ import { DEFAULT_RESPONSE , EventType } from './constants' ;
8
8
import { BatchProcessingError } from './errors' ;
9
- import type {
10
- EventSourceDataClassTypes ,
11
- PartialItemFailureResponse ,
12
- PartialItemFailures ,
13
- } from './types' ;
9
+ import type { PartialItemFailureResponse , PartialItemFailures } from './types' ;
14
10
15
11
/**
16
- * Process batch and partially report failed items
12
+ * Abstract class to process a batch of records and report partial failures
17
13
*/
18
14
abstract class BasePartialBatchProcessor extends BasePartialProcessor {
19
- public COLLECTOR_MAPPING ;
20
-
21
- public batchResponse : PartialItemFailureResponse ;
22
-
23
- public eventType : keyof typeof EventType ;
15
+ /**
16
+ * Response object to be used in reporting partial failures
17
+ */
18
+ protected batchResponse : PartialItemFailureResponse ;
19
+ /**
20
+ * The type of event that triggered the Lambda function
21
+ */
22
+ private eventType : keyof typeof EventType ;
24
23
25
24
/**
26
25
* Initializes base batch processing class
@@ -29,128 +28,116 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
29
28
public constructor ( eventType : keyof typeof EventType ) {
30
29
super ( ) ;
31
30
this . eventType = eventType ;
32
- this . batchResponse = DEFAULT_RESPONSE ;
33
- this . COLLECTOR_MAPPING = {
34
- [ EventType . SQS ] : ( ) => this . collectSqsFailures ( ) ,
35
- [ EventType . KinesisDataStreams ] : ( ) => this . collectKinesisFailures ( ) ,
36
- [ EventType . DynamoDBStreams ] : ( ) => this . collectDynamoDBFailures ( ) ,
37
- } ;
31
+ this . batchResponse = { ...DEFAULT_RESPONSE } ;
32
+ }
33
+
34
+ /**
35
+ * Return response object to be used in reporting partial failures
36
+ */
37
+ public response ( ) : PartialItemFailureResponse {
38
+ return this . batchResponse ;
38
39
}
39
40
40
41
/**
41
- * Report messages to be deleted in case of partial failures
42
+ * Perfom cleanup after processing a batch of records.
43
+ *
44
+ * If the entire batch failed, throw an error. Otherwise,
45
+ * prepare the response object to be used in reporting partial failures.
42
46
*/
43
- public clean ( ) : void {
47
+ protected clean ( ) : void {
44
48
if ( ! this . hasMessagesToReport ( ) ) {
45
49
return ;
46
50
}
47
51
48
52
if ( this . entireBatchFailed ( ) ) {
49
53
throw new BatchProcessingError (
50
54
'All records failed processing. ' +
51
- this . exceptions . length +
55
+ this . errors . length +
52
56
' individual errors logged separately below.' ,
53
- this . exceptions
57
+ this . errors
54
58
) ;
55
59
}
56
60
57
- const messages : PartialItemFailures [ ] = this . getMessagesToReport ( ) ;
58
- this . batchResponse = { batchItemFailures : messages } ;
61
+ this . batchResponse = { batchItemFailures : this . getMessagesToReport ( ) } ;
59
62
}
60
63
61
64
/**
62
- * Collects identifiers of failed items for a DynamoDB stream
63
- * @returns list of identifiers for failed items
65
+ * Collect the identifiers of failed items for a DynamoDB stream.
64
66
*/
65
- public collectDynamoDBFailures ( ) : PartialItemFailures [ ] {
67
+ protected collectDynamoDBFailures ( ) : PartialItemFailures [ ] {
66
68
const failures : PartialItemFailures [ ] = [ ] ;
67
69
68
- for ( const msg of this . failureMessages ) {
69
- const msgId = ( msg as DynamoDBRecord ) . dynamodb ?. SequenceNumber ;
70
- if ( msgId ) {
71
- failures . push ( { itemIdentifier : msgId } ) ;
70
+ for ( const message of this . failureMessages ) {
71
+ const messageId = ( message as DynamoDBRecord ) . dynamodb ?. SequenceNumber ;
72
+ if ( messageId ) {
73
+ failures . push ( { itemIdentifier : messageId } ) ;
72
74
}
73
75
}
74
76
75
77
return failures ;
76
78
}
77
79
78
80
/**
79
- * Collects identifiers of failed items for a Kinesis stream
80
- * @returns list of identifiers for failed items
81
+ * Collect the identifiers of failed items for a Kinesis data stream.
81
82
*/
82
- public collectKinesisFailures ( ) : PartialItemFailures [ ] {
83
- const failures : PartialItemFailures [ ] = [ ] ;
84
-
85
- for ( const msg of this . failureMessages ) {
86
- const msgId = ( msg as KinesisStreamRecord ) . kinesis . sequenceNumber ;
87
- failures . push ( { itemIdentifier : msgId } ) ;
88
- }
89
-
90
- return failures ;
83
+ protected collectKinesisFailures ( ) : PartialItemFailures [ ] {
84
+ return this . failureMessages . map ( ( message ) => {
85
+ const {
86
+ kinesis : { sequenceNumber } ,
87
+ } = message as KinesisStreamRecord ;
88
+
89
+ return { itemIdentifier : sequenceNumber } ;
90
+ } ) ;
91
91
}
92
92
93
93
/**
94
- * Collects identifiers of failed items for an SQS batch
95
- * @returns list of identifiers for failed items
94
+ * Collect the identifiers of failed items for a SQS queue.
96
95
*/
97
- public collectSqsFailures ( ) : PartialItemFailures [ ] {
98
- const failures : PartialItemFailures [ ] = [ ] ;
99
-
100
- for ( const msg of this . failureMessages ) {
101
- const msgId = ( msg as SQSRecord ) . messageId ;
102
- failures . push ( { itemIdentifier : msgId } ) ;
103
- }
96
+ protected collectSqsFailures ( ) : PartialItemFailures [ ] {
97
+ return this . failureMessages . map ( ( message ) => {
98
+ const { messageId } = message as SQSRecord ;
104
99
105
- return failures ;
100
+ return { itemIdentifier : messageId } ;
101
+ } ) ;
106
102
}
107
103
108
104
/**
109
- * Determines whether all records in a batch failed to process
110
- * @returns true if all records resulted in exception results
105
+ * Determine whether the entire batch failed to be processed.
111
106
*/
112
- public entireBatchFailed ( ) : boolean {
113
- return this . exceptions . length == this . records . length ;
107
+ protected entireBatchFailed ( ) : boolean {
108
+ return this . errors . length = == this . records . length ;
114
109
}
115
110
116
111
/**
117
- * Collects identifiers for failed batch items
118
- * @returns formatted messages to use in batch deletion
112
+ * Collect all failed messages and returns them as a list of partial failures
113
+ * according to the event type.
119
114
*/
120
- public getMessagesToReport ( ) : PartialItemFailures [ ] {
121
- return this . COLLECTOR_MAPPING [ this . eventType ] ( ) ;
115
+ protected getMessagesToReport ( ) : PartialItemFailures [ ] {
116
+ switch ( this . eventType ) {
117
+ case EventType . SQS :
118
+ return this . collectSqsFailures ( ) ;
119
+ case EventType . KinesisDataStreams :
120
+ return this . collectKinesisFailures ( ) ;
121
+ case EventType . DynamoDBStreams :
122
+ return this . collectDynamoDBFailures ( ) ;
123
+ }
122
124
}
123
125
124
126
/**
125
- * Determines if any records failed to process
126
- * @returns true if any records resulted in exception
127
+ * Determine whether there are any failed messages to report as partial failures.
127
128
*/
128
- public hasMessagesToReport ( ) : boolean {
129
+ protected hasMessagesToReport ( ) : boolean {
129
130
return this . failureMessages . length != 0 ;
130
131
}
131
132
132
133
/**
133
- * Remove results from previous execution
134
+ * Prepare class instance for processing a new batch of records.
134
135
*/
135
- public prepare ( ) : void {
136
+ protected prepare ( ) : void {
136
137
this . successMessages . length = 0 ;
137
138
this . failureMessages . length = 0 ;
138
- this . exceptions . length = 0 ;
139
- this . batchResponse = DEFAULT_RESPONSE ;
140
- }
141
-
142
- /**
143
- * @returns Batch items that failed processing, if any
144
- */
145
- public response ( ) : PartialItemFailureResponse {
146
- return this . batchResponse ;
147
- }
148
-
149
- public toBatchType (
150
- record : EventSourceDataClassTypes ,
151
- eventType : keyof typeof EventType
152
- ) : SQSRecord | KinesisStreamRecord | DynamoDBRecord {
153
- return DATA_CLASS_MAPPING [ eventType ] ( record ) ;
139
+ this . errors . length = 0 ;
140
+ this . batchResponse = { ...DEFAULT_RESPONSE } ;
154
141
}
155
142
}
156
143
0 commit comments