Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lambda-invoker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>lambda</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.LambdaClientBuilder;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.services.lambda.model.LambdaException;

import java.net.URI;
import java.time.Duration;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class LambdaFunctionHandler implements LightHttpHandler {
private static final Logger logger = LoggerFactory.getLogger(LambdaFunctionHandler.class);
Expand All @@ -39,15 +46,29 @@ public class LambdaFunctionHandler implements LightHttpHandler {
private static final String EMPTY_LAMBDA_RESPONSE = "ERR10064";
private static AbstractMetricsHandler metricsHandler;

private static LambdaClient client;
private static LambdaAsyncClient client;

public LambdaFunctionHandler() {
config = LambdaInvokerConfig.load();
LambdaClientBuilder builder = LambdaClient.builder().region(Region.of(config.getRegion()));
SdkAsyncHttpClient asyncHttpClient = NettyNioAsyncHttpClient.builder()
.readTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
.writeTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
.connectionTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
.build();
ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder()
.apiCallTimeout(Duration.ofMillis(config.getApiCallTimeout()))
.apiCallAttemptTimeout(Duration.ofSeconds(config.getApiCallAttemptTimeout()))
.build();

var builder = LambdaAsyncClient.builder().region(Region.of(config.getRegion()))
.httpClient(asyncHttpClient)
.overrideConfiguration(overrideConfig);

if(!StringUtils.isEmpty(config.getEndpointOverride())) {
builder.endpointOverride(URI.create(config.getEndpointOverride()));
}
client = builder.build();

if(config.isMetricsInjection()) {
// get the metrics handler from the handler chain for metrics registration. If we cannot get the
// metrics handler, then an error message will be logged.
Expand Down Expand Up @@ -91,9 +112,9 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
requestEvent.setQueryStringParameters(queryStringParameters);
requestEvent.setBody(body == null ? null : body);
String requestBody = JsonMapper.objectMapper.writeValueAsString(requestEvent);
if(logger.isTraceEnabled()) logger.trace("requestBody = " + requestBody);
if(logger.isTraceEnabled()) logger.trace("requestBody = {}", requestBody);
String res = invokeFunction(client, functionName, requestBody);
if(logger.isDebugEnabled()) logger.debug("response = " + res);
if(logger.isDebugEnabled()) logger.debug("response = {}", res);
if(res == null) {
setExchangeStatus(exchange, EMPTY_LAMBDA_RESPONSE, functionName);
if(config.isMetricsInjection() && metricsHandler != null) metricsHandler.injectMetrics(exchange, startTime, config.getMetricsName(), endpoint);
Expand All @@ -106,7 +127,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
if(config.isMetricsInjection() && metricsHandler != null) metricsHandler.injectMetrics(exchange, startTime, config.getMetricsName(), endpoint);
}

private String invokeFunction(LambdaClient awsLambda, String functionName, String requestBody) {
private String invokeFunction(LambdaAsyncClient client, String functionName, String requestBody) {
String response = null;
try {
//Need a SdkBytes instance for the payload
Expand All @@ -118,20 +139,20 @@ private String invokeFunction(LambdaClient awsLambda, String functionName, Strin
.logType(config.getLogType())
.payload(payload)
.build();

//Invoke the Lambda function
InvokeResponse res = awsLambda.invoke(request);
if(logger.isDebugEnabled()) {
logger.debug("lambda call function error:" + res.functionError());
logger.debug("lambda logger result:" + res.logResult());
logger.debug("lambda call status:" + res.statusCode());
}

response = res.payload().asUtf8String() ;
} catch(LambdaException e) {
CompletableFuture<String> futureResponse = client.invoke(request)
.thenApply(res -> {
if(logger.isTraceEnabled()) logger.trace("LambdaFunctionHandler.invokeFunction response: {}", res);
return res.payload().asUtf8String();
})
.exceptionally(e -> {
logger.error("Error invoking lambda function: {}", functionName, e);
return null;
});
return futureResponse.get();
} catch(InterruptedException | ExecutionException e) {
logger.error("LambdaException", e);
}
return response;
return null;
}

private Map<String, String> convertHeaders(HeaderMap headerMap) {
Expand Down Expand Up @@ -176,11 +197,25 @@ private void setResponseHeaders(HttpServerExchange exchange, Map<String, String>

public static void reload() {
config.reload();
LambdaClientBuilder builder = LambdaClient.builder().region(Region.of(config.getRegion()));
SdkAsyncHttpClient asyncHttpClient = NettyNioAsyncHttpClient.builder()
.readTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
.writeTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
.connectionTimeout(Duration.ofMillis(config.getApiCallAttemptTimeout()))
.build();
ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder()
.apiCallTimeout(Duration.ofMillis(config.getApiCallTimeout()))
.apiCallAttemptTimeout(Duration.ofSeconds(config.getApiCallAttemptTimeout()))
.build();

var builder = LambdaAsyncClient.builder().region(Region.of(config.getRegion()))
.httpClient(asyncHttpClient)
.overrideConfiguration(overrideConfig);

if(!StringUtils.isEmpty(config.getEndpointOverride())) {
builder.endpointOverride(URI.create(config.getEndpointOverride()));
}
client = builder.build();

if(config.isMetricsInjection()) {
// get the metrics handler from the handler chain for metrics registration. If we cannot get the
// metrics handler, then an error message will be logged.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ public class LambdaInvokerConfig {
public static final String CONFIG_NAME = "lambda-invoker";
private static final String REGION = "region";
private static final String ENDPOINT_OVERRIDE = "endpointOverride";
private static final String API_CALL_TIMEOUT = "apiCallTimeout";
private static final String API_CALL_ATTEMPT_TIMEOUT = "apiCallAttemptTimeout";
private static final String LOG_TYPE = "logType";
private static final String FUNCTIONS = "functions";
private static final String METRICS_INJECTION = "metricsInjection";
private static final String METRICS_NAME = "metricsName";

private String region;
private String endpointOverride;
private int apiCallTimeout;
private int apiCallAttemptTimeout;
private String logType;
private Map<String, String> functions;
private boolean metricsInjection;
Expand All @@ -44,6 +48,22 @@ public void setEndpointOverride(String endpointOverride) {
this.endpointOverride = endpointOverride;
}

public int getApiCallTimeout() {
return apiCallTimeout;
}

public void setApiCallTimeout(int apiCallTimeout) {
this.apiCallTimeout = apiCallTimeout;
}

public int getApiCallAttemptTimeout() {
return apiCallAttemptTimeout;
}

public void setApiCallAttemptTimeout(int apiCallAttemptTimeout) {
this.apiCallAttemptTimeout = apiCallAttemptTimeout;
}

public String getLogType() {
return logType;
}
Expand Down Expand Up @@ -120,13 +140,21 @@ private void setConfigData() {
if (object != null) {
endpointOverride = ((String) object);
}
object = mappedConfig.get(API_CALL_TIMEOUT);
if (object != null) {
apiCallTimeout = Config.loadIntegerValue(API_CALL_TIMEOUT, object);
}
object = mappedConfig.get(API_CALL_ATTEMPT_TIMEOUT);
if (object != null) {
apiCallAttemptTimeout = Config.loadIntegerValue(API_CALL_ATTEMPT_TIMEOUT, object);
}
object = mappedConfig.get(LOG_TYPE);
if (object != null) {
logType = ((String) object);
}
object = getMappedConfig().get(METRICS_INJECTION);
if(object != null && (Boolean) object) {
metricsInjection = true;
if(object != null) {
metricsInjection = Config.loadBooleanValue(METRICS_INJECTION, object);
}
object = getMappedConfig().get(METRICS_NAME);
if(object != null ) {
Expand Down Expand Up @@ -164,6 +192,5 @@ private void setConfigMap() {
throw new ConfigException("functions must be a string string map.");
}
}

}
}
4 changes: 4 additions & 0 deletions lambda-invoker/src/main/resources/config/lambda-invoker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ region: ${lambda-invoker.region:us-east-1}
# endpoint override if for lambda function deployed in virtual private cloud. Here is an example.
# https://vpce-0012C939329d982-tk8ps.lambda.ca-central-1.vpce.amazonaws.com
endpointOverride: ${lambda-invoker.endpointOverride:}
# Api call timeout in milliseconds. This sets the amount of time for the entire execution, including all retry attempts.
apiCallTimeout: ${lambda-invoker.apiCallTimeout:60000}
# Api call attempt timeout in milliseconds. This sets the amount of time for each individual attempt.
apiCallAttemptTimeout: ${lambda-invoker.apiCallAttemptTimeout:20000}
# The LogType of the execution log of Lambda. Set Tail to include and None to not include.
logType: ${lambda-invoker.logType:Tail}
# mapping of the endpoints to Lambda functions. Define a list of functions in values.yml file.
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@
<artifactId>lambda</artifactId>
<version>${version.lambda-awssdk}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
<version>${version.lambda-awssdk}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down