Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions apm-lambda-extension/extension/apm_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,36 @@ var bufferPool = sync.Pool{New: func() interface{} {

type ApmServerTransportStatusType string

// Constants for the state of the transport used in
// the backoff implementation.
const (
Failing ApmServerTransportStatusType = "Failing"
Pending ApmServerTransportStatusType = "Pending"
Healthy ApmServerTransportStatusType = "Healthy"
)

// A struct to track the state and status of sending
// to the APM server. Used in the backoff implementation.
type ApmServerTransportStateType struct {
sync.Mutex
Status ApmServerTransportStatusType
ReconnectionCount int
GracePeriodTimer *time.Timer
}

// The status of transport to the APM server.
//
// This instance of the ApmServerTransportStateType is public for use in tests.
var ApmServerTransportState = ApmServerTransportStateType{
Status: Healthy,
ReconnectionCount: -1,
}

// PostToApmServer takes a chunk of APM agent data and posts it to the APM server.
//
// The function compresses the APM agent data, if it's not already compressed.
// It sets the APM transport status to failing upon errors, as part of the backoff
// strategy.
func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig, ctx context.Context) error {
// todo: can this be a streaming or streaming style call that keeps the
// connection open across invocations?
Expand Down Expand Up @@ -123,10 +135,21 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension
return nil
}

// IsTransportStatusHealthyOrPending returns true if the APM server transport status is
// healthy or pending, and false otherwise.
//
// This function is public for use in tests.
func IsTransportStatusHealthyOrPending() bool {
return ApmServerTransportState.Status != Failing
}

// SetApmServerTransportState takes a state of the APM server transport and updates
// the current state of the transport. For a change to a failing state, the grace period
// is calculated and a go routine is started that waits for that period to complete
// before changing the status to "pending". This would allow a subsequent send attempt
// to the APM server.
//
// This function is public for use in tests.
func SetApmServerTransportState(status ApmServerTransportStatusType, ctx context.Context) {
switch status {
case Healthy:
Expand Down Expand Up @@ -165,6 +188,8 @@ func computeGracePeriod() time.Duration {
return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second))
}

// EnqueueAPMData adds a AgentData struct to the agent data channel, effectively queueing for a send
// to the APM server.
func EnqueueAPMData(agentDataChannel chan AgentData, agentData AgentData) {
select {
case agentDataChannel <- agentData:
Expand Down
1 change: 1 addition & 0 deletions apm-lambda-extension/extension/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

var agentDataServer *http.Server

// StartHttpServer starts the server listening for APM agent data.
func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) (err error) {
mux := http.NewServeMux()
mux.HandleFunc("/", handleInfoRequest(config.apmServerUrl))
Expand Down
2 changes: 1 addition & 1 deletion apm-lambda-extension/extension/process_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func getIntFromEnv(name string) (int, error) {
return value, nil
}

// ProcessEnv : pull env into globals
// ProcessEnv extracts ENV variables into globals
func ProcessEnv() *extensionConfig {
dataReceiverTimeoutSeconds, err := getIntFromEnv("ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS")
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions apm-lambda-extension/extension/process_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"net/http"
)

// ProcessShutdown processes the Shutdown event received from the
// Lambda runtime API.
func ProcessShutdown() {
Log.Info("Received SHUTDOWN event, exiting")
agentDataServer.Close()
}

// FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server.
func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *extensionConfig, ctx context.Context) {
if !IsTransportStatusHealthyOrPending() {
Log.Debug("Flush skipped - Transport unhealthy")
Expand All @@ -48,6 +51,7 @@ func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *exten
}
}

// PrettyPrint prints formatted, legible json data.
func PrettyPrint(v interface{}) string {
data, err := json.MarshalIndent(v, "", "\t")
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions apm-lambda-extension/logsapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
Extension EventType = "extension"
)

// SubEventType is a Logs API sub event type
type SubEventType string

const (
Expand Down Expand Up @@ -108,6 +109,7 @@ type Destination struct {
Encoding HttpEncoding `json:"encoding"`
}

// SchemaVersion is the Lambda runtime API schema version
type SchemaVersion string

const (
Expand Down
4 changes: 3 additions & 1 deletion apm-lambda-extension/logsapi/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ var ListenerHost = "sandbox"
var Server *http.Server
var Listener net.Listener

// LogEvent represents an event received from the Logs API
type LogEvent struct {
Time time.Time `json:"time"`
Type SubEventType `json:"type"`
StringRecord string
Record LogEventRecord
}

// LogEventRecord is a sub-object in a Logs API event
type LogEventRecord struct {
RequestId string `json:"requestId"`
Status string `json:"status"`
Expand All @@ -63,7 +65,7 @@ func subscribe(extensionID string, eventTypes []EventType) error {
return err
}

// Subscribe : Starts the HTTP server listening for log events and subscribes to the Logs API
// Subscribe starts the HTTP server listening for log events and subscribes to the Logs API
func Subscribe(ctx context.Context, extensionID string, eventTypes []EventType, out chan LogEvent) (err error) {
if checkAWSSamLocal() {
return errors.New("Detected sam local environment")
Expand Down