diff --git a/lambda-invoker/pom.xml b/lambda-invoker/pom.xml
index 4f96b9b..0320d05 100644
--- a/lambda-invoker/pom.xml
+++ b/lambda-invoker/pom.xml
@@ -63,6 +63,10 @@
software.amazon.awssdk
lambda
+
+ software.amazon.awssdk
+ netty-nio-client
+
com.fasterxml.jackson.core
jackson-databind
diff --git a/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaFunctionHandler.java b/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaFunctionHandler.java
index 8252c10..b2c7bac 100644
--- a/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaFunctionHandler.java
+++ b/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaFunctionHandler.java
@@ -20,7 +20,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.LambdaClientBuilder;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
@@ -28,9 +32,12 @@
import software.amazon.awssdk.services.lambda.model.LambdaException;
import java.net.URI;
+import java.time.Duration;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
public class LambdaFunctionHandler implements LightHttpHandler {
private static final Logger logger = LoggerFactory.getLogger(LambdaFunctionHandler.class);
@@ -39,15 +46,29 @@ public class LambdaFunctionHandler implements LightHttpHandler {
private static final String EMPTY_LAMBDA_RESPONSE = "ERR10064";
private static AbstractMetricsHandler metricsHandler;
- private static LambdaClient client;
+ private static LambdaAsyncClient client;
public LambdaFunctionHandler() {
config = LambdaInvokerConfig.load();
- LambdaClientBuilder builder = LambdaClient.builder().region(Region.of(config.getRegion()));
+ SdkAsyncHttpClient asyncHttpClient = NettyNioAsyncHttpClient.builder()
+ .readTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
+ .writeTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
+ .connectionTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
+ .build();
+ ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder()
+ .apiCallTimeout(Duration.ofMillis(config.getApiCallTimeout()))
+ .apiCallAttemptTimeout(Duration.ofSeconds(config.getApiCallAttemptTimeout()))
+ .build();
+
+ var builder = LambdaAsyncClient.builder().region(Region.of(config.getRegion()))
+ .httpClient(asyncHttpClient)
+ .overrideConfiguration(overrideConfig);
+
if(!StringUtils.isEmpty(config.getEndpointOverride())) {
builder.endpointOverride(URI.create(config.getEndpointOverride()));
}
client = builder.build();
+
if(config.isMetricsInjection()) {
// get the metrics handler from the handler chain for metrics registration. If we cannot get the
// metrics handler, then an error message will be logged.
@@ -91,9 +112,9 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
requestEvent.setQueryStringParameters(queryStringParameters);
requestEvent.setBody(body == null ? null : body);
String requestBody = JsonMapper.objectMapper.writeValueAsString(requestEvent);
- if(logger.isTraceEnabled()) logger.trace("requestBody = " + requestBody);
+ if(logger.isTraceEnabled()) logger.trace("requestBody = {}", requestBody);
String res = invokeFunction(client, functionName, requestBody);
- if(logger.isDebugEnabled()) logger.debug("response = " + res);
+ if(logger.isDebugEnabled()) logger.debug("response = {}", res);
if(res == null) {
setExchangeStatus(exchange, EMPTY_LAMBDA_RESPONSE, functionName);
if(config.isMetricsInjection() && metricsHandler != null) metricsHandler.injectMetrics(exchange, startTime, config.getMetricsName(), endpoint);
@@ -106,7 +127,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
if(config.isMetricsInjection() && metricsHandler != null) metricsHandler.injectMetrics(exchange, startTime, config.getMetricsName(), endpoint);
}
- private String invokeFunction(LambdaClient awsLambda, String functionName, String requestBody) {
+ private String invokeFunction(LambdaAsyncClient client, String functionName, String requestBody) {
String response = null;
try {
//Need a SdkBytes instance for the payload
@@ -118,20 +139,20 @@ private String invokeFunction(LambdaClient awsLambda, String functionName, Strin
.logType(config.getLogType())
.payload(payload)
.build();
-
- //Invoke the Lambda function
- InvokeResponse res = awsLambda.invoke(request);
- if(logger.isDebugEnabled()) {
- logger.debug("lambda call function error:" + res.functionError());
- logger.debug("lambda logger result:" + res.logResult());
- logger.debug("lambda call status:" + res.statusCode());
- }
-
- response = res.payload().asUtf8String() ;
- } catch(LambdaException e) {
+ CompletableFuture futureResponse = client.invoke(request)
+ .thenApply(res -> {
+ if(logger.isTraceEnabled()) logger.trace("LambdaFunctionHandler.invokeFunction response: {}", res);
+ return res.payload().asUtf8String();
+ })
+ .exceptionally(e -> {
+ logger.error("Error invoking lambda function: {}", functionName, e);
+ return null;
+ });
+ return futureResponse.get();
+ } catch(InterruptedException | ExecutionException e) {
logger.error("LambdaException", e);
}
- return response;
+ return null;
}
private Map convertHeaders(HeaderMap headerMap) {
@@ -176,11 +197,25 @@ private void setResponseHeaders(HttpServerExchange exchange, Map
public static void reload() {
config.reload();
- LambdaClientBuilder builder = LambdaClient.builder().region(Region.of(config.getRegion()));
+ SdkAsyncHttpClient asyncHttpClient = NettyNioAsyncHttpClient.builder()
+ .readTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
+ .writeTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
+ .connectionTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
+ .build();
+ ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder()
+ .apiCallTimeout(Duration.ofMillis(config.getApiCallTimeout()))
+ .apiCallAttemptTimeout(Duration.ofSeconds(config.getApiCallAttemptTimeout()))
+ .build();
+
+ var builder = LambdaAsyncClient.builder().region(Region.of(config.getRegion()))
+ .httpClient(asyncHttpClient)
+ .overrideConfiguration(overrideConfig);
+
if(!StringUtils.isEmpty(config.getEndpointOverride())) {
builder.endpointOverride(URI.create(config.getEndpointOverride()));
}
client = builder.build();
+
if(config.isMetricsInjection()) {
// get the metrics handler from the handler chain for metrics registration. If we cannot get the
// metrics handler, then an error message will be logged.
diff --git a/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaInvokerConfig.java b/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaInvokerConfig.java
index ea97ed7..a588630 100644
--- a/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaInvokerConfig.java
+++ b/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaInvokerConfig.java
@@ -16,6 +16,8 @@ public class LambdaInvokerConfig {
public static final String CONFIG_NAME = "lambda-invoker";
private static final String REGION = "region";
private static final String ENDPOINT_OVERRIDE = "endpointOverride";
+ private static final String API_CALL_TIMEOUT = "apiCallTimeout";
+ private static final String API_CALL_ATTEMPT_TIMEOUT = "apiCallAttemptTimeout";
private static final String LOG_TYPE = "logType";
private static final String FUNCTIONS = "functions";
private static final String METRICS_INJECTION = "metricsInjection";
@@ -23,6 +25,8 @@ public class LambdaInvokerConfig {
private String region;
private String endpointOverride;
+ private int apiCallTimeout;
+ private int apiCallAttemptTimeout;
private String logType;
private Map functions;
private boolean metricsInjection;
@@ -44,6 +48,22 @@ public void setEndpointOverride(String endpointOverride) {
this.endpointOverride = endpointOverride;
}
+ public int getApiCallTimeout() {
+ return apiCallTimeout;
+ }
+
+ public void setApiCallTimeout(int apiCallTimeout) {
+ this.apiCallTimeout = apiCallTimeout;
+ }
+
+ public int getApiCallAttemptTimeout() {
+ return apiCallAttemptTimeout;
+ }
+
+ public void setApiCallAttemptTimeout(int apiCallAttemptTimeout) {
+ this.apiCallAttemptTimeout = apiCallAttemptTimeout;
+ }
+
public String getLogType() {
return logType;
}
@@ -120,13 +140,21 @@ private void setConfigData() {
if (object != null) {
endpointOverride = ((String) object);
}
+ object = mappedConfig.get(API_CALL_TIMEOUT);
+ if (object != null) {
+ apiCallTimeout = Config.loadIntegerValue(API_CALL_TIMEOUT, object);
+ }
+ object = mappedConfig.get(API_CALL_ATTEMPT_TIMEOUT);
+ if (object != null) {
+ apiCallAttemptTimeout = Config.loadIntegerValue(API_CALL_ATTEMPT_TIMEOUT, object);
+ }
object = mappedConfig.get(LOG_TYPE);
if (object != null) {
logType = ((String) object);
}
object = getMappedConfig().get(METRICS_INJECTION);
- if(object != null && (Boolean) object) {
- metricsInjection = true;
+ if(object != null) {
+ metricsInjection = Config.loadBooleanValue(METRICS_INJECTION, object);
}
object = getMappedConfig().get(METRICS_NAME);
if(object != null ) {
@@ -164,6 +192,5 @@ private void setConfigMap() {
throw new ConfigException("functions must be a string string map.");
}
}
-
}
}
diff --git a/lambda-invoker/src/main/resources/config/lambda-invoker.yml b/lambda-invoker/src/main/resources/config/lambda-invoker.yml
index 12fa63f..e240623 100644
--- a/lambda-invoker/src/main/resources/config/lambda-invoker.yml
+++ b/lambda-invoker/src/main/resources/config/lambda-invoker.yml
@@ -3,6 +3,10 @@ region: ${lambda-invoker.region:us-east-1}
# endpoint override if for lambda function deployed in virtual private cloud. Here is an example.
# https://vpce-0012C939329d982-tk8ps.lambda.ca-central-1.vpce.amazonaws.com
endpointOverride: ${lambda-invoker.endpointOverride:}
+# Api call timeout in milliseconds. This sets the amount of time for the entire execution, including all retry attempts.
+apiCallTimeout: ${lambda-invoker.apiCallTimeout:60000}
+# Api call attempt timeout in milliseconds. This sets the amount of time for each individual attempt.
+apiCallAttemptTimeout: ${lambda-invoker.apiCallAttemptTimeout:20000}
# The LogType of the execution log of Lambda. Set Tail to include and None to not include.
logType: ${lambda-invoker.logType:Tail}
# mapping of the endpoints to Lambda functions. Define a list of functions in values.yml file.
diff --git a/pom.xml b/pom.xml
index efe14b0..4667e20 100644
--- a/pom.xml
+++ b/pom.xml
@@ -291,6 +291,11 @@
lambda
${version.lambda-awssdk}
+
+ software.amazon.awssdk
+ netty-nio-client
+ ${version.lambda-awssdk}
+
ch.qos.logback
logback-classic