Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apm-lambda-extension/e2e-testing/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func TestEndToEnd(t *testing.T) {
panic("No config file")
}

extension.InitLogger()
if os.Getenv("ELASTIC_APM_LOG_LEVEL") != "" {
logLevel, _ := logrus.ParseLevel(os.Getenv("ELASTIC_APM_LOG_LEVEL"))
extension.Log.Logger.SetLevel(logLevel)
Expand Down
87 changes: 86 additions & 1 deletion apm-lambda-extension/extension/apm_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,49 @@ package extension
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
"net/http"
"sync"
"time"
)

var bufferPool = sync.Pool{New: func() interface{} {
return &bytes.Buffer{}
}}

type ApmServerTransportStatusType string

const (
Failing ApmServerTransportStatusType = "Failing"
Pending ApmServerTransportStatusType = "Pending"
Healthy ApmServerTransportStatusType = "Healthy"
)

type ApmServerTransportStateType struct {
sync.Mutex
Status ApmServerTransportStatusType
ReconnectionCount int
GracePeriodTimer *time.Timer
}

var ApmServerTransportState = ApmServerTransportStateType{
Status: Healthy,
ReconnectionCount: -1,
}

// todo: can this be a streaming or streaming style call that keeps the
// connection open across invocations?
func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig) error {
func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig, ctx context.Context) error {
if !IsTransportStatusHealthyOrPending() {
return errors.New("transport status is unhealthy")
}

endpointURI := "intake/v2/events"
encoding := agentData.ContentEncoding

Expand Down Expand Up @@ -72,19 +101,75 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension
req.Header.Add("Authorization", "Bearer "+config.apmServerSecretToken)
}

Log.Debug("Sending data chunk to APM Server")
resp, err := client.Do(req)
if err != nil {
SetApmServerTransportState(Failing, ctx)
return fmt.Errorf("failed to post to APM server: %v", err)
}

//Read the response body
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
SetApmServerTransportState(Failing, ctx)
return fmt.Errorf("failed to read the response body after posting to the APM server")
}

SetApmServerTransportState(Healthy, ctx)
Log.Debug("Transport status set to healthy")
Log.Debugf("APM server response body: %v", string(body))
Log.Debugf("APM server response status code: %v", resp.StatusCode)
return nil
}

func IsTransportStatusHealthyOrPending() bool {
return ApmServerTransportState.Status != Failing
}

func SetApmServerTransportState(status ApmServerTransportStatusType, ctx context.Context) {
switch status {
case Healthy:
ApmServerTransportState.Lock()
ApmServerTransportState.Status = status
Log.Debugf("APM Server Transport status set to %s", status)
ApmServerTransportState.ReconnectionCount = -1
ApmServerTransportState.Unlock()
case Failing:
ApmServerTransportState.Lock()
ApmServerTransportState.Status = status
Log.Debugf("APM Server Transport status set to %s", status)
ApmServerTransportState.ReconnectionCount++
ApmServerTransportState.GracePeriodTimer = time.NewTimer(computeGracePeriod())
Log.Debugf("Grace period entered, reconnection count : %d", ApmServerTransportState.ReconnectionCount)
go func() {
select {
case <-ApmServerTransportState.GracePeriodTimer.C:
Log.Debug("Grace period over - timer timed out")
case <-ctx.Done():
Log.Debug("Grace period over - context done")
}
ApmServerTransportState.Status = Pending
Log.Debugf("APM Server Transport status set to %s", status)
ApmServerTransportState.Unlock()
}()
default:
Log.Errorf("Cannot set APM Server Transport status to %s", status)
}
}

// ComputeGracePeriod https://github.com/elastic/apm/blob/main/specs/agents/transport.md#transport-errors
func computeGracePeriod() time.Duration {
gracePeriodWithoutJitter := math.Pow(math.Min(float64(ApmServerTransportState.ReconnectionCount), 6), 2)
jitter := rand.Float64()/5 - 0.1
return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second))
}

func EnqueueAPMData(agentDataChannel chan AgentData, agentData AgentData) {
select {
case agentDataChannel <- agentData:
Log.Debug("Adding agent data to buffer to be sent to apm server")
default:
Log.Warn("Channel full: dropping a subset of agent data")
}
}
Loading