From ecab55f2abeef2e73e5e5c9cbf3893bf80ae67fa Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Fri, 11 Nov 2022 13:05:39 +0100 Subject: [PATCH 1/5] add timeout to invoke --- cmd/localstack/custom_interop.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cmd/localstack/custom_interop.go b/cmd/localstack/custom_interop.go index dbe4061..8a7c9a5 100644 --- a/cmd/localstack/custom_interop.go +++ b/cmd/localstack/custom_interop.go @@ -12,6 +12,7 @@ import ( "go.amzn.com/lambda/rapidcore/standalone" "io" "net/http" + "strconv" "strings" "time" ) @@ -87,6 +88,12 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo } invokeResp := &standalone.ResponseWriterProxy{} + invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") + invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv) + if err != nil { + log.Fatalln(err) + } + invokeTimeout := time.Second * time.Duration(invokeTimeoutSeconds) functionVersion := GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION") // default $LATEST _, _ = fmt.Fprintf(logCollector, "START RequestId: %s Version: %s\n", invokeR.InvokeId, functionVersion) @@ -99,12 +106,12 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo CorrelationID: "invokeCorrelationID", NeedDebugLogs: true, InvokedFunctionArn: invokeR.InvokedFunctionArn, + DeadlineNs: strconv.FormatInt(invokeTimeout.Nanoseconds(), 10), }) if err != nil { log.Fatalln(err) } - inv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") - timeoutDuration, _ := time.ParseDuration(inv + "s") + timeoutDuration, _ := time.ParseDuration(invokeTimeoutEnv + "s") memorySize := GetEnvOrDie("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") PrintEndReports(invokeR.InvokeId, "", memorySize, invokeStart, timeoutDuration, logCollector) From 34969de4da0857673face8e750db76c20ecf2e14 Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Fri, 11 Nov 2022 13:46:23 +0100 Subject: [PATCH 2/5] set server timeout instead of invoke deadline --- cmd/localstack/custom_interop.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cmd/localstack/custom_interop.go b/cmd/localstack/custom_interop.go index 8a7c9a5..34a3b5f 100644 --- a/cmd/localstack/custom_interop.go +++ b/cmd/localstack/custom_interop.go @@ -68,6 +68,13 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo RuntimeId: lsOpts.RuntimeId, }, } + invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") + invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv) + if err != nil { + log.Fatalln(err) + } + invokeTimeout := time.Second * time.Duration(invokeTimeoutSeconds) + server.delegate.SetInvokeTimeout(invokeTimeout) // TODO: extract this go func() { @@ -88,12 +95,6 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo } invokeResp := &standalone.ResponseWriterProxy{} - invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") - invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv) - if err != nil { - log.Fatalln(err) - } - invokeTimeout := time.Second * time.Duration(invokeTimeoutSeconds) functionVersion := GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION") // default $LATEST _, _ = fmt.Fprintf(logCollector, "START RequestId: %s Version: %s\n", invokeR.InvokeId, functionVersion) @@ -106,7 +107,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo CorrelationID: "invokeCorrelationID", NeedDebugLogs: true, InvokedFunctionArn: invokeR.InvokedFunctionArn, - DeadlineNs: strconv.FormatInt(invokeTimeout.Nanoseconds(), 10), + //DeadlineNs: }) if err != nil { log.Fatalln(err) From 2ba328ad975cfdf21f7a9907a01defa5bf557983 Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Fri, 11 Nov 2022 13:52:13 +0100 Subject: [PATCH 3/5] enable json/trace logging --- cmd/localstack/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index 77437ba..84f16e5 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -46,9 +46,9 @@ func main() { lsOpts := InitLsOpts() // set up logging (logrus) - //log.SetFormatter(&log.JSONFormatter{}) - //log.SetLevel(log.TraceLevel) - log.SetLevel(log.DebugLevel) + log.SetFormatter(&log.JSONFormatter{}) + log.SetLevel(log.TraceLevel) + //log.SetLevel(log.DebugLevel) log.SetReportCaller(true) // download code archive if env variable is set DownloadCodeArchive(lsOpts.CodeDownloadUrl) From b0da13f4ba00ca2ab7efeabd1729e9ebf6fa770d Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Mon, 14 Nov 2022 16:38:08 +0100 Subject: [PATCH 4/5] add timeout handling --- Makefile | 3 ++- cmd/localstack/custom_interop.go | 45 +++++++++++++++++++++----------- cmd/localstack/main.go | 11 ++++++-- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/Makefile b/Makefile index 1384ea2..22e7cdb 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ # LOCALSTACK CHANGES 2022-03-10: remove linker flags and add gc flags for delve debugger # LOCALSTACK CHANGES 2022-03-28: change compile src folder +# LOCALSTACK CHANGES 2022-11-14: add --rm flag to compile-with-docker # RELEASE_BUILD_LINKER_FLAGS disables DWARF and symbol table generation to reduce binary size #RELEASE_BUILD_LINKER_FLAGS=-s -w @@ -21,7 +22,7 @@ compile-lambda-linux-all: make ARCH=arm64 compile-lambda-linux compile-with-docker: - docker run --env GOPROXY=direct -v $(shell pwd):/LambdaRuntimeLocal -w /LambdaRuntimeLocal golang:1.18 make ARCH=${ARCH} compile-lambda-linux + docker run --rm --env GOPROXY=direct -v $(shell pwd):/LambdaRuntimeLocal -w /LambdaRuntimeLocal golang:1.18 make ARCH=${ARCH} compile-lambda-linux compile-lambda-linux: CGO_ENABLED=0 GOOS=linux GOARCH=${GO_ARCH_${ARCH}} go build -ldflags "${RELEASE_BUILD_LINKER_FLAGS}" -gcflags="all=-N -l" -o ${DESTINATION_${ARCH}} ./cmd/localstack diff --git a/cmd/localstack/custom_interop.go b/cmd/localstack/custom_interop.go index 34a3b5f..18e324a 100644 --- a/cmd/localstack/custom_interop.go +++ b/cmd/localstack/custom_interop.go @@ -12,7 +12,6 @@ import ( "go.amzn.com/lambda/rapidcore/standalone" "io" "net/http" - "strconv" "strings" "time" ) @@ -53,9 +52,9 @@ type InvokeRequest struct { type ErrorResponse struct { ErrorMessage string `json:"errorMessage"` - ErrorType string `json:"errorType"` - RequestId string `json:"requestId"` - StackTrace []string `json:"stackTrace"` + ErrorType string `json:"errorType,omitempty"` + RequestId string `json:"requestId,omitempty"` + StackTrace []string `json:"stackTrace,omitempty"` } func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, logCollector *LogCollector) (server *CustomInteropServer) { @@ -68,13 +67,6 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo RuntimeId: lsOpts.RuntimeId, }, } - invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") - invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv) - if err != nil { - log.Fatalln(err) - } - invokeTimeout := time.Second * time.Duration(invokeTimeoutSeconds) - server.delegate.SetInvokeTimeout(invokeTimeout) // TODO: extract this go func() { @@ -109,10 +101,34 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo InvokedFunctionArn: invokeR.InvokedFunctionArn, //DeadlineNs: }) + timeout := int(server.delegate.GetInvokeTimeout().Seconds()) + isErr := false if err != nil { - log.Fatalln(err) + switch err { + case rapidcore.ErrInvokeTimeout: + log.Debugf("Got invoke timeout") + isErr = true + errorResponse := ErrorResponse{ + ErrorMessage: fmt.Sprintf( + "%s %s Task timed out after %d.00 seconds", + time.Now().Format("2006-01-02T15:04:05Z"), + invokeR.InvokeId, + timeout, + ), + } + jsonErrorResponse, err := json.Marshal(errorResponse) + if err != nil { + log.Fatalln("unable to marshall json timeout response") + } + _, err = invokeResp.Write(jsonErrorResponse) + if err != nil { + log.Fatalln("unable to write to response") + } + default: + log.Fatalln(err) + } } - timeoutDuration, _ := time.ParseDuration(invokeTimeoutEnv + "s") + timeoutDuration := time.Duration(timeout) * time.Second memorySize := GetEnvOrDie("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") PrintEndReports(invokeR.InvokeId, "", memorySize, invokeStart, timeoutDuration, logCollector) @@ -125,8 +141,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo var errR map[string]any marshalErr := json.Unmarshal(invokeResp.Body, &errR) - isErr := false - if marshalErr == nil { + if !isErr && marshalErr == nil { _, isErr = errR["errorType"] } diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index 84f16e5..67db293 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -9,6 +9,7 @@ import ( _ "net/http/pprof" "os" "runtime/debug" + "strconv" ) type LsOpts struct { @@ -72,12 +73,18 @@ func main() { // initialize all flows and start runtime API go sandbox.Create() + // get timeout + invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") + invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv) + if err != nil { + log.Fatalln(err) + } // start runtime init - go InitHandler(sandbox, GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), 30) // TODO: replace this with a custom init + go InitHandler(sandbox, GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds)) // TODO: replace this with a custom init // TODO: make the tracing server optional // start blocking with the tracing server - err := http.ListenAndServe("0.0.0.0:"+lsOpts.InitTracingPort, http.DefaultServeMux) + err = http.ListenAndServe("0.0.0.0:"+lsOpts.InitTracingPort, http.DefaultServeMux) if err != nil { log.Fatal("Failed to start debug server") } From 9d755c4a0b56bf32d72828b89784e47f459204d6 Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Mon, 14 Nov 2022 16:39:51 +0100 Subject: [PATCH 5/5] Revert "enable json/trace logging" This reverts commit 2ba328ad975cfdf21f7a9907a01defa5bf557983. --- cmd/localstack/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index 67db293..9da81ad 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -47,9 +47,9 @@ func main() { lsOpts := InitLsOpts() // set up logging (logrus) - log.SetFormatter(&log.JSONFormatter{}) - log.SetLevel(log.TraceLevel) - //log.SetLevel(log.DebugLevel) + //log.SetFormatter(&log.JSONFormatter{}) + //log.SetLevel(log.TraceLevel) + log.SetLevel(log.DebugLevel) log.SetReportCaller(true) // download code archive if env variable is set DownloadCodeArchive(lsOpts.CodeDownloadUrl)