From 842da33a657cc6877a821bc23b67e1edd4700ad3 Mon Sep 17 00:00:00 2001 From: Steve Hu Date: Wed, 19 Jun 2024 11:50:15 -0400 Subject: [PATCH] fixes #123 Make the Lambda client timeout configurable --- lambda-invoker/pom.xml | 4 ++ .../aws/lambda/LambdaFunctionHandler.java | 71 ++++++++++++++----- .../aws/lambda/LambdaInvokerConfig.java | 33 ++++++++- .../main/resources/config/lambda-invoker.yml | 4 ++ pom.xml | 5 ++ 5 files changed, 96 insertions(+), 21 deletions(-) diff --git a/lambda-invoker/pom.xml b/lambda-invoker/pom.xml index 4f96b9b..0320d05 100644 --- a/lambda-invoker/pom.xml +++ b/lambda-invoker/pom.xml @@ -63,6 +63,10 @@ software.amazon.awssdk lambda + + software.amazon.awssdk + netty-nio-client + com.fasterxml.jackson.core jackson-databind diff --git a/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaFunctionHandler.java b/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaFunctionHandler.java index 8252c10..b2c7bac 100644 --- a/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaFunctionHandler.java +++ b/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaFunctionHandler.java @@ -20,7 +20,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaAsyncClient; import software.amazon.awssdk.services.lambda.LambdaClient; import software.amazon.awssdk.services.lambda.LambdaClientBuilder; import software.amazon.awssdk.services.lambda.model.InvokeRequest; @@ -28,9 +32,12 @@ import software.amazon.awssdk.services.lambda.model.LambdaException; import java.net.URI; +import java.time.Duration; import java.util.Deque; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; public class LambdaFunctionHandler implements LightHttpHandler { private static final Logger logger = LoggerFactory.getLogger(LambdaFunctionHandler.class); @@ -39,15 +46,29 @@ public class LambdaFunctionHandler implements LightHttpHandler { private static final String EMPTY_LAMBDA_RESPONSE = "ERR10064"; private static AbstractMetricsHandler metricsHandler; - private static LambdaClient client; + private static LambdaAsyncClient client; public LambdaFunctionHandler() { config = LambdaInvokerConfig.load(); - LambdaClientBuilder builder = LambdaClient.builder().region(Region.of(config.getRegion())); + SdkAsyncHttpClient asyncHttpClient = NettyNioAsyncHttpClient.builder() + .readTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout())) + .writeTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout())) + .connectionTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout())) + .build(); + ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() + .apiCallTimeout(Duration.ofMillis(config.getApiCallTimeout())) + .apiCallAttemptTimeout(Duration.ofSeconds(config.getApiCallAttemptTimeout())) + .build(); + + var builder = LambdaAsyncClient.builder().region(Region.of(config.getRegion())) + .httpClient(asyncHttpClient) + .overrideConfiguration(overrideConfig); + if(!StringUtils.isEmpty(config.getEndpointOverride())) { builder.endpointOverride(URI.create(config.getEndpointOverride())); } client = builder.build(); + if(config.isMetricsInjection()) { // get the metrics handler from the handler chain for metrics registration. If we cannot get the // metrics handler, then an error message will be logged. @@ -91,9 +112,9 @@ public void handleRequest(HttpServerExchange exchange) throws Exception { requestEvent.setQueryStringParameters(queryStringParameters); requestEvent.setBody(body == null ? null : body); String requestBody = JsonMapper.objectMapper.writeValueAsString(requestEvent); - if(logger.isTraceEnabled()) logger.trace("requestBody = " + requestBody); + if(logger.isTraceEnabled()) logger.trace("requestBody = {}", requestBody); String res = invokeFunction(client, functionName, requestBody); - if(logger.isDebugEnabled()) logger.debug("response = " + res); + if(logger.isDebugEnabled()) logger.debug("response = {}", res); if(res == null) { setExchangeStatus(exchange, EMPTY_LAMBDA_RESPONSE, functionName); if(config.isMetricsInjection() && metricsHandler != null) metricsHandler.injectMetrics(exchange, startTime, config.getMetricsName(), endpoint); @@ -106,7 +127,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception { if(config.isMetricsInjection() && metricsHandler != null) metricsHandler.injectMetrics(exchange, startTime, config.getMetricsName(), endpoint); } - private String invokeFunction(LambdaClient awsLambda, String functionName, String requestBody) { + private String invokeFunction(LambdaAsyncClient client, String functionName, String requestBody) { String response = null; try { //Need a SdkBytes instance for the payload @@ -118,20 +139,20 @@ private String invokeFunction(LambdaClient awsLambda, String functionName, Strin .logType(config.getLogType()) .payload(payload) .build(); - - //Invoke the Lambda function - InvokeResponse res = awsLambda.invoke(request); - if(logger.isDebugEnabled()) { - logger.debug("lambda call function error:" + res.functionError()); - logger.debug("lambda logger result:" + res.logResult()); - logger.debug("lambda call status:" + res.statusCode()); - } - - response = res.payload().asUtf8String() ; - } catch(LambdaException e) { + CompletableFuture futureResponse = client.invoke(request) + .thenApply(res -> { + if(logger.isTraceEnabled()) logger.trace("LambdaFunctionHandler.invokeFunction response: {}", res); + return res.payload().asUtf8String(); + }) + .exceptionally(e -> { + logger.error("Error invoking lambda function: {}", functionName, e); + return null; + }); + return futureResponse.get(); + } catch(InterruptedException | ExecutionException e) { logger.error("LambdaException", e); } - return response; + return null; } private Map convertHeaders(HeaderMap headerMap) { @@ -176,11 +197,25 @@ private void setResponseHeaders(HttpServerExchange exchange, Map public static void reload() { config.reload(); - LambdaClientBuilder builder = LambdaClient.builder().region(Region.of(config.getRegion())); + SdkAsyncHttpClient asyncHttpClient = NettyNioAsyncHttpClient.builder() + .readTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout())) + .writeTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout())) + .connectionTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout())) + .build(); + ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() + .apiCallTimeout(Duration.ofMillis(config.getApiCallTimeout())) + .apiCallAttemptTimeout(Duration.ofSeconds(config.getApiCallAttemptTimeout())) + .build(); + + var builder = LambdaAsyncClient.builder().region(Region.of(config.getRegion())) + .httpClient(asyncHttpClient) + .overrideConfiguration(overrideConfig); + if(!StringUtils.isEmpty(config.getEndpointOverride())) { builder.endpointOverride(URI.create(config.getEndpointOverride())); } client = builder.build(); + if(config.isMetricsInjection()) { // get the metrics handler from the handler chain for metrics registration. If we cannot get the // metrics handler, then an error message will be logged. diff --git a/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaInvokerConfig.java b/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaInvokerConfig.java index ea97ed7..a588630 100644 --- a/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaInvokerConfig.java +++ b/lambda-invoker/src/main/java/com/networknt/aws/lambda/LambdaInvokerConfig.java @@ -16,6 +16,8 @@ public class LambdaInvokerConfig { public static final String CONFIG_NAME = "lambda-invoker"; private static final String REGION = "region"; private static final String ENDPOINT_OVERRIDE = "endpointOverride"; + private static final String API_CALL_TIMEOUT = "apiCallTimeout"; + private static final String API_CALL_ATTEMPT_TIMEOUT = "apiCallAttemptTimeout"; private static final String LOG_TYPE = "logType"; private static final String FUNCTIONS = "functions"; private static final String METRICS_INJECTION = "metricsInjection"; @@ -23,6 +25,8 @@ public class LambdaInvokerConfig { private String region; private String endpointOverride; + private int apiCallTimeout; + private int apiCallAttemptTimeout; private String logType; private Map functions; private boolean metricsInjection; @@ -44,6 +48,22 @@ public void setEndpointOverride(String endpointOverride) { this.endpointOverride = endpointOverride; } + public int getApiCallTimeout() { + return apiCallTimeout; + } + + public void setApiCallTimeout(int apiCallTimeout) { + this.apiCallTimeout = apiCallTimeout; + } + + public int getApiCallAttemptTimeout() { + return apiCallAttemptTimeout; + } + + public void setApiCallAttemptTimeout(int apiCallAttemptTimeout) { + this.apiCallAttemptTimeout = apiCallAttemptTimeout; + } + public String getLogType() { return logType; } @@ -120,13 +140,21 @@ private void setConfigData() { if (object != null) { endpointOverride = ((String) object); } + object = mappedConfig.get(API_CALL_TIMEOUT); + if (object != null) { + apiCallTimeout = Config.loadIntegerValue(API_CALL_TIMEOUT, object); + } + object = mappedConfig.get(API_CALL_ATTEMPT_TIMEOUT); + if (object != null) { + apiCallAttemptTimeout = Config.loadIntegerValue(API_CALL_ATTEMPT_TIMEOUT, object); + } object = mappedConfig.get(LOG_TYPE); if (object != null) { logType = ((String) object); } object = getMappedConfig().get(METRICS_INJECTION); - if(object != null && (Boolean) object) { - metricsInjection = true; + if(object != null) { + metricsInjection = Config.loadBooleanValue(METRICS_INJECTION, object); } object = getMappedConfig().get(METRICS_NAME); if(object != null ) { @@ -164,6 +192,5 @@ private void setConfigMap() { throw new ConfigException("functions must be a string string map."); } } - } } diff --git a/lambda-invoker/src/main/resources/config/lambda-invoker.yml b/lambda-invoker/src/main/resources/config/lambda-invoker.yml index 12fa63f..e240623 100644 --- a/lambda-invoker/src/main/resources/config/lambda-invoker.yml +++ b/lambda-invoker/src/main/resources/config/lambda-invoker.yml @@ -3,6 +3,10 @@ region: ${lambda-invoker.region:us-east-1} # endpoint override if for lambda function deployed in virtual private cloud. Here is an example. # https://vpce-0012C939329d982-tk8ps.lambda.ca-central-1.vpce.amazonaws.com endpointOverride: ${lambda-invoker.endpointOverride:} +# Api call timeout in milliseconds. This sets the amount of time for the entire execution, including all retry attempts. +apiCallTimeout: ${lambda-invoker.apiCallTimeout:60000} +# Api call attempt timeout in milliseconds. This sets the amount of time for each individual attempt. +apiCallAttemptTimeout: ${lambda-invoker.apiCallAttemptTimeout:20000} # The LogType of the execution log of Lambda. Set Tail to include and None to not include. logType: ${lambda-invoker.logType:Tail} # mapping of the endpoints to Lambda functions. Define a list of functions in values.yml file. diff --git a/pom.xml b/pom.xml index efe14b0..4667e20 100644 --- a/pom.xml +++ b/pom.xml @@ -291,6 +291,11 @@ lambda ${version.lambda-awssdk} + + software.amazon.awssdk + netty-nio-client + ${version.lambda-awssdk} + ch.qos.logback logback-classic