diff --git a/apm-lambda-extension/extension/apm_server.go b/apm-lambda-extension/extension/apm_server.go index f32eca7d..5c61806c 100644 --- a/apm-lambda-extension/extension/apm_server.go +++ b/apm-lambda-extension/extension/apm_server.go @@ -38,12 +38,16 @@ var bufferPool = sync.Pool{New: func() interface{} { type ApmServerTransportStatusType string +// Constants for the state of the transport used in +// the backoff implementation. const ( Failing ApmServerTransportStatusType = "Failing" Pending ApmServerTransportStatusType = "Pending" Healthy ApmServerTransportStatusType = "Healthy" ) +// A struct to track the state and status of sending +// to the APM server. Used in the backoff implementation. type ApmServerTransportStateType struct { sync.Mutex Status ApmServerTransportStatusType @@ -51,11 +55,19 @@ type ApmServerTransportStateType struct { GracePeriodTimer *time.Timer } +// The status of transport to the APM server. +// +// This instance of the ApmServerTransportStateType is public for use in tests. var ApmServerTransportState = ApmServerTransportStateType{ Status: Healthy, ReconnectionCount: -1, } +// PostToApmServer takes a chunk of APM agent data and posts it to the APM server. +// +// The function compresses the APM agent data, if it's not already compressed. +// It sets the APM transport status to failing upon errors, as part of the backoff +// strategy. func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig, ctx context.Context) error { // todo: can this be a streaming or streaming style call that keeps the // connection open across invocations? @@ -123,10 +135,21 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension return nil } +// IsTransportStatusHealthyOrPending returns true if the APM server transport status is +// healthy or pending, and false otherwise. +// +// This function is public for use in tests. func IsTransportStatusHealthyOrPending() bool { return ApmServerTransportState.Status != Failing } +// SetApmServerTransportState takes a state of the APM server transport and updates +// the current state of the transport. For a change to a failing state, the grace period +// is calculated and a go routine is started that waits for that period to complete +// before changing the status to "pending". This would allow a subsequent send attempt +// to the APM server. +// +// This function is public for use in tests. func SetApmServerTransportState(status ApmServerTransportStatusType, ctx context.Context) { switch status { case Healthy: @@ -165,6 +188,8 @@ func computeGracePeriod() time.Duration { return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second)) } +// EnqueueAPMData adds a AgentData struct to the agent data channel, effectively queueing for a send +// to the APM server. func EnqueueAPMData(agentDataChannel chan AgentData, agentData AgentData) { select { case agentDataChannel <- agentData: diff --git a/apm-lambda-extension/extension/http_server.go b/apm-lambda-extension/extension/http_server.go index 5438b0bf..19a3decf 100644 --- a/apm-lambda-extension/extension/http_server.go +++ b/apm-lambda-extension/extension/http_server.go @@ -25,6 +25,7 @@ import ( var agentDataServer *http.Server +// StartHttpServer starts the server listening for APM agent data. func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) (err error) { mux := http.NewServeMux() mux.HandleFunc("/", handleInfoRequest(config.apmServerUrl)) diff --git a/apm-lambda-extension/extension/process_env.go b/apm-lambda-extension/extension/process_env.go index 6c56610a..a8902b9a 100644 --- a/apm-lambda-extension/extension/process_env.go +++ b/apm-lambda-extension/extension/process_env.go @@ -63,7 +63,7 @@ func getIntFromEnv(name string) (int, error) { return value, nil } -// ProcessEnv : pull env into globals +// ProcessEnv extracts ENV variables into globals func ProcessEnv() *extensionConfig { dataReceiverTimeoutSeconds, err := getIntFromEnv("ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS") if err != nil { diff --git a/apm-lambda-extension/extension/process_events.go b/apm-lambda-extension/extension/process_events.go index 66e4bd73..c47e5644 100644 --- a/apm-lambda-extension/extension/process_events.go +++ b/apm-lambda-extension/extension/process_events.go @@ -23,11 +23,14 @@ import ( "net/http" ) +// ProcessShutdown processes the Shutdown event received from the +// Lambda runtime API. func ProcessShutdown() { Log.Info("Received SHUTDOWN event, exiting") agentDataServer.Close() } +// FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server. func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *extensionConfig, ctx context.Context) { if !IsTransportStatusHealthyOrPending() { Log.Debug("Flush skipped - Transport unhealthy") @@ -48,6 +51,7 @@ func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *exten } } +// PrettyPrint prints formatted, legible json data. func PrettyPrint(v interface{}) string { data, err := json.MarshalIndent(v, "", "\t") if err != nil { diff --git a/apm-lambda-extension/logsapi/client.go b/apm-lambda-extension/logsapi/client.go index bedc4f4d..f8dd490c 100644 --- a/apm-lambda-extension/logsapi/client.go +++ b/apm-lambda-extension/logsapi/client.go @@ -55,6 +55,7 @@ const ( Extension EventType = "extension" ) +// SubEventType is a Logs API sub event type type SubEventType string const ( @@ -108,6 +109,7 @@ type Destination struct { Encoding HttpEncoding `json:"encoding"` } +// SchemaVersion is the Lambda runtime API schema version type SchemaVersion string const ( diff --git a/apm-lambda-extension/logsapi/subscribe.go b/apm-lambda-extension/logsapi/subscribe.go index 9955b81c..d1afb82a 100644 --- a/apm-lambda-extension/logsapi/subscribe.go +++ b/apm-lambda-extension/logsapi/subscribe.go @@ -33,6 +33,7 @@ var ListenerHost = "sandbox" var Server *http.Server var Listener net.Listener +// LogEvent represents an event received from the Logs API type LogEvent struct { Time time.Time `json:"time"` Type SubEventType `json:"type"` @@ -40,6 +41,7 @@ type LogEvent struct { Record LogEventRecord } +// LogEventRecord is a sub-object in a Logs API event type LogEventRecord struct { RequestId string `json:"requestId"` Status string `json:"status"` @@ -63,7 +65,7 @@ func subscribe(extensionID string, eventTypes []EventType) error { return err } -// Subscribe : Starts the HTTP server listening for log events and subscribes to the Logs API +// Subscribe starts the HTTP server listening for log events and subscribes to the Logs API func Subscribe(ctx context.Context, extensionID string, eventTypes []EventType, out chan LogEvent) (err error) { if checkAWSSamLocal() { return errors.New("Detected sam local environment")