Skip to content

Commit 3a7b67b

Browse files
feat(batch): add support for batch execution in parallel with custom Executor (#1900)
* feat: add support for batch execution in parallel with custom Executor * pmd_analyse fix * add docs * Update docs/utilities/batch.md Co-authored-by: Philipp Page <[email protected]> --------- Co-authored-by: Philipp Page <[email protected]> Co-authored-by: Philipp Page <[email protected]>
1 parent f8a9ede commit 3a7b67b

File tree

15 files changed

+425
-69
lines changed

15 files changed

+425
-69
lines changed

docs/utilities/batch.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,31 @@ used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown.
510510
}
511511
}
512512
```
513+
=== "Example with SQS (using custom executor)"
514+
515+
```java hl_lines="4 10 15"
516+
public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
517+
518+
private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;
519+
private final ExecutorService executor;
520+
521+
public SqsBatchHandler() {
522+
handler = new BatchMessageHandlerBuilder()
523+
.withSqsBatchHandler()
524+
.buildWithMessageHandler(this::processMessage, Product.class);
525+
executor = Executors.newFixedThreadPool(2);
526+
}
527+
528+
@Override
529+
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
530+
return handler.processBatchInParallel(sqsEvent, context, executor);
531+
}
532+
533+
private void processMessage(Product p, Context c) {
534+
// Process the product
535+
}
536+
}
537+
```
513538

514539

515540
## Handling Messages

examples/powertools-examples-batch/src/main/java/org/demo/batch/dynamo/DynamoDBStreamBatchHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context contex
2525
return handler.processBatch(ddbEvent, context);
2626
}
2727

28-
private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
28+
private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord) {
2929
LOGGER.info("Processing DynamoDB Stream Record" + dynamodbStreamRecord);
3030
}
3131

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.demo.batch.dynamo;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
6+
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
10+
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
11+
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
15+
public class DynamoDBStreamBatchHandlerParallel implements RequestHandler<DynamodbEvent, StreamsEventResponse> {
16+
17+
private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBStreamBatchHandlerParallel.class);
18+
private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;
19+
private final ExecutorService executor;
20+
21+
public DynamoDBStreamBatchHandlerParallel() {
22+
handler = new BatchMessageHandlerBuilder()
23+
.withDynamoDbBatchHandler()
24+
.buildWithRawMessageHandler(this::processMessage);
25+
executor = Executors.newFixedThreadPool(2);
26+
}
27+
28+
@Override
29+
public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
30+
return handler.processBatchInParallel(ddbEvent, context, executor);
31+
}
32+
33+
private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord) {
34+
LOGGER.info("Processing DynamoDB Stream Record" + dynamodbStreamRecord);
35+
}
36+
37+
}

examples/powertools-examples-batch/src/main/java/org/demo/batch/kinesis/KinesisBatchHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context con
2626
return handler.processBatch(kinesisEvent, context);
2727
}
2828

29-
private void processMessage(Product p, Context c) {
29+
private void processMessage(Product p) {
3030
LOGGER.info("Processing product " + p);
3131
}
3232

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.demo.batch.kinesis;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
6+
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
7+
import org.demo.batch.model.Product;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
11+
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
12+
13+
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.Executors;
15+
16+
public class KinesisBatchHandlerParallel implements RequestHandler<KinesisEvent, StreamsEventResponse> {
17+
18+
private static final Logger LOGGER = LoggerFactory.getLogger(KinesisBatchHandlerParallel.class);
19+
private final BatchMessageHandler<KinesisEvent, StreamsEventResponse> handler;
20+
private final ExecutorService executor;
21+
22+
23+
public KinesisBatchHandlerParallel() {
24+
handler = new BatchMessageHandlerBuilder()
25+
.withKinesisBatchHandler()
26+
.buildWithMessageHandler(this::processMessage, Product.class);
27+
executor = Executors.newFixedThreadPool(2);
28+
}
29+
30+
@Override
31+
public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context context) {
32+
return handler.processBatchInParallel(kinesisEvent, context, executor);
33+
}
34+
35+
private void processMessage(Product p) {
36+
LOGGER.info("Processing product " + p);
37+
}
38+
39+
}

examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,10 @@ public class AbstractSqsBatchHandler {
4242
/**
4343
* Simulate some processing (I/O + S3 put request)
4444
* @param p deserialized product
45-
* @param context Lambda context
4645
*/
4746
@Logging
4847
@Tracing
49-
protected void processMessage(Product p, Context context) {
48+
protected void processMessage(Product p) {
5049
TracingUtils.putAnnotation("productId", p.getId());
5150
TracingUtils.putAnnotation("Thread", Thread.currentThread().getName());
5251
MDC.put("product", String.valueOf(p.getId()));
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.demo.batch.sqs;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
6+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
7+
import org.demo.batch.model.Product;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
11+
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
12+
import software.amazon.lambda.powertools.logging.Logging;
13+
import software.amazon.lambda.powertools.tracing.Tracing;
14+
15+
import java.util.concurrent.ExecutorService;
16+
import java.util.concurrent.Executors;
17+
18+
public class SqsBatchHandlerParallel extends AbstractSqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
19+
private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchHandlerParallel.class);
20+
private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;
21+
private final ExecutorService executor;
22+
23+
public SqsBatchHandlerParallel() {
24+
handler = new BatchMessageHandlerBuilder()
25+
.withSqsBatchHandler()
26+
.buildWithMessageHandler(this::processMessage, Product.class);
27+
executor = Executors.newFixedThreadPool(2);
28+
}
29+
30+
@Logging
31+
@Tracing
32+
@Override
33+
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
34+
LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size());
35+
return handler.processBatchInParallel(sqsEvent, context, executor);
36+
}
37+
}

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
import com.amazonaws.services.lambda.runtime.Context;
1818

19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.ExecutorService;
21+
1922
/**
2023
* The basic interface a batch message handler must meet.
2124
*
@@ -50,4 +53,14 @@ public interface BatchMessageHandler<E, R> {
5053
* @return A partial batch response
5154
*/
5255
R processBatchInParallel(E event, Context context);
56+
57+
58+
/**
59+
* Same as {@link #processBatchInParallel(Object, Context)} but with an option to provide custom {@link Executor}
60+
* @param event The Lambda event containing the batch to process
61+
* @param context The lambda context
62+
* @param executor Custom executor to use for parallel processing
63+
* @return A partial batch response
64+
*/
65+
R processBatchInParallel(E event, Context context, Executor executor);
5366
}

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717
import com.amazonaws.services.lambda.runtime.Context;
1818
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
1919
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
20+
21+
import java.util.ArrayList;
2022
import java.util.List;
2123
import java.util.Optional;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.Executor;
2226
import java.util.function.BiConsumer;
2327
import java.util.function.Consumer;
2428
import java.util.stream.Collectors;
@@ -66,7 +70,9 @@ public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context
6670
.parallelStream() // Parallel processing
6771
.map(eventRecord -> {
6872
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
69-
return processBatchItem(eventRecord, context);
73+
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
74+
multiThreadMDC.removeThread(Thread.currentThread().getName());
75+
return failureOpt;
7076
})
7177
.filter(Optional::isPresent)
7278
.map(Optional::get)
@@ -75,6 +81,23 @@ public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context
7581
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
7682
}
7783

84+
@Override
85+
public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context, Executor executor) {
86+
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
87+
88+
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
89+
List<CompletableFuture<Void>> futures = event.getRecords().stream()
90+
.map(eventRecord -> CompletableFuture.runAsync(() -> {
91+
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
92+
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
93+
failureOpt.ifPresent(batchItemFailures::add);
94+
multiThreadMDC.removeThread(Thread.currentThread().getName());
95+
}, executor))
96+
.collect(Collectors.toList());
97+
futures.forEach(CompletableFuture::join);
98+
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
99+
}
100+
78101
private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) {
79102
try {
80103
LOGGER.debug("Processing item {}", streamRecord.getEventID());
@@ -86,19 +109,19 @@ private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(Dynamod
86109
this.successHandler.accept(streamRecord);
87110
}
88111
return Optional.empty();
89-
} catch (Throwable t) {
112+
} catch (Exception e) {
90113
String sequenceNumber = streamRecord.getDynamodb().getSequenceNumber();
91114
LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures",
92-
sequenceNumber, t.getMessage());
93-
LOGGER.error("Error was", t);
115+
sequenceNumber, e.getMessage());
116+
LOGGER.error("Error was", e);
94117

95118
// Report failure if we have a handler
96119
if (this.failureHandler != null) {
97120
// A failing failure handler is no reason to fail the batch
98121
try {
99-
this.failureHandler.accept(streamRecord, t);
100-
} catch (Throwable t2) {
101-
LOGGER.warn("failureHandler threw handling failure", t2);
122+
this.failureHandler.accept(streamRecord, e);
123+
} catch (Exception e2) {
124+
LOGGER.warn("failureHandler threw handling failure", e2);
102125
}
103126
}
104127
return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(sequenceNumber).build());

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
import com.amazonaws.services.lambda.runtime.Context;
1919
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
2020
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
21+
22+
import java.util.ArrayList;
2123
import java.util.List;
2224
import java.util.Optional;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.Executor;
2327
import java.util.function.BiConsumer;
2428
import java.util.function.Consumer;
2529
import java.util.stream.Collectors;
@@ -77,7 +81,9 @@ public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context c
7781
.parallelStream() // Parallel processing
7882
.map(eventRecord -> {
7983
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
80-
return processBatchItem(eventRecord, context);
84+
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
85+
multiThreadMDC.removeThread(Thread.currentThread().getName());
86+
return failureOpt;
8187
})
8288
.filter(Optional::isPresent)
8389
.map(Optional::get)
@@ -86,6 +92,23 @@ public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context c
8692
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
8793
}
8894

95+
@Override
96+
public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context context, Executor executor) {
97+
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
98+
99+
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
100+
List<CompletableFuture<Void>> futures = event.getRecords().stream()
101+
.map(eventRecord -> CompletableFuture.runAsync(() -> {
102+
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
103+
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
104+
failureOpt.ifPresent(batchItemFailures::add);
105+
multiThreadMDC.removeThread(Thread.currentThread().getName());
106+
}, executor))
107+
.collect(Collectors.toList());
108+
futures.forEach(CompletableFuture::join);
109+
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
110+
}
111+
89112
private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(KinesisEvent.KinesisEventRecord eventRecord, Context context) {
90113
try {
91114
LOGGER.debug("Processing item {}", eventRecord.getEventID());
@@ -102,19 +125,19 @@ private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(Kinesis
102125
this.successHandler.accept(eventRecord);
103126
}
104127
return Optional.empty();
105-
} catch (Throwable t) {
128+
} catch (Exception e) {
106129
String sequenceNumber = eventRecord.getEventID();
107130
LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures",
108-
sequenceNumber, t.getMessage());
109-
LOGGER.error("Error was", t);
131+
sequenceNumber, e.getMessage());
132+
LOGGER.error("Error was", e);
110133

111134
// Report failure if we have a handler
112135
if (this.failureHandler != null) {
113136
// A failing failure handler is no reason to fail the batch
114137
try {
115-
this.failureHandler.accept(eventRecord, t);
116-
} catch (Throwable t2) {
117-
LOGGER.warn("failureHandler threw handling failure", t2);
138+
this.failureHandler.accept(eventRecord, e);
139+
} catch (Exception e2) {
140+
LOGGER.warn("failureHandler threw handling failure", e2);
118141
}
119142
}
120143

0 commit comments

Comments
 (0)