Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.1.0...main[View commits]
===== Features
- Disable CGO to prevent libc/ABI compatibility issues {lambda-pull}292[292]
- Add support for collecting and shipping function logs to APM Server {lambda-pull}303[303]
- Batch data collected from lambda logs API before sending to APM Server {lambda-pull}314[314]

[float]
===== Bug fixes
Expand Down
112 changes: 80 additions & 32 deletions apmproxy/apmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,30 @@ type jsonError struct {
Document string `json:"document,omitempty"`
}

// ForwardApmData receives agent data as it comes in and posts it to the APM server.
// Stop checking for, and sending agent data when the function invocation
// ForwardApmData receives apm data as it comes in and posts it to the APM server.
// Stop checking for, and sending apm data when the function invocation
// has completed, signaled via a channel.
func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *MetadataContainer) error {
func (c *Client) ForwardApmData(ctx context.Context) error {
if c.IsUnhealthy() {
return nil
}

var lambdaDataChan chan APMData
for {
select {
case <-ctx.Done():
c.logger.Debug("Invocation context cancelled, not processing any more agent data")
return nil
case agentData := <-c.DataChannel:
if metadataContainer.Metadata == nil {
metadata, err := ProcessMetadata(agentData)
if err != nil {
return fmt.Errorf("failed to extract metadata from agent payload %w", err)
}
metadataContainer.Metadata = metadata
case data := <-c.AgentDataChannel:
if err := c.forwardAgentData(ctx, data); err != nil {
return err
}
if err := c.PostToApmServer(ctx, agentData); err != nil {
return fmt.Errorf("error sending to APM server, skipping: %v", err)
// Wait for metadata to be available, metadata will be available as soon as
// the first agent data is processed.
lambdaDataChan = c.LambdaDataChannel
case data := <-lambdaDataChan:
if err := c.forwardLambdaData(ctx, data); err != nil {
return err
}
}
}
Expand All @@ -74,15 +76,38 @@ func (c *Client) FlushAPMData(ctx context.Context) {
return
}
c.logger.Debug("Flush started - Checking for agent data")

// Flush agent data first to make sure metadata is available if possible
for i := len(c.AgentDataChannel); i > 0; i-- {
data := <-c.AgentDataChannel
if err := c.forwardAgentData(ctx, data); err != nil {
c.logger.Errorf("Error sending to APM Server, skipping: %v", err)
}
}

// If metadata still not available then fail fast
if c.batch == nil {
c.logger.Warnf("Metadata not available at flush, skipping sending lambda data to APM Server")
return
}

// Flush lambda data
for {
select {
case agentData := <-c.DataChannel:
c.logger.Debug("Flush in progress - Processing agent data")
if err := c.PostToApmServer(ctx, agentData); err != nil {
case apmData := <-c.LambdaDataChannel:
c.logger.Debug("Flush in progress - Processing lambda data")
if err := c.forwardLambdaData(ctx, apmData); err != nil {
c.logger.Errorf("Error sending to APM server, skipping: %v", err)
}
case <-ctx.Done():
c.logger.Debugf("Failed to flush completely, may result in data drop")
return
default:
c.logger.Debug("Flush ended - No agent data on buffer")
// Flush any remaining data in batch
if err := c.sendBatch(ctx); err != nil {
c.logger.Errorf("Error sending to APM server, skipping: %v", err)
}
c.logger.Debug("Flush ended for lambda data - no data in buffer")
return
}
}
Expand All @@ -93,19 +118,19 @@ func (c *Client) FlushAPMData(ctx context.Context) {
// The function compresses the APM agent data, if it's not already compressed.
// It sets the APM transport status to failing upon errors, as part of the backoff
// strategy.
func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error {
func (c *Client) PostToApmServer(ctx context.Context, apmData APMData) error {
// todo: can this be a streaming or streaming style call that keeps the
// connection open across invocations?
if c.IsUnhealthy() {
return errors.New("transport status is unhealthy")
}

endpointURI := "intake/v2/events"
encoding := agentData.ContentEncoding
encoding := apmData.ContentEncoding

var r io.Reader
if agentData.ContentEncoding != "" {
r = bytes.NewReader(agentData.Data)
if apmData.ContentEncoding != "" {
r = bytes.NewReader(apmData.Data)
} else {
encoding = "gzip"
buf := c.bufferPool.Get().(*bytes.Buffer)
Expand All @@ -117,7 +142,7 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error
if err != nil {
return err
}
if _, err := gw.Write(agentData.Data); err != nil {
if _, err := gw.Write(apmData.Data); err != nil {
return fmt.Errorf("failed to compress data: %w", err)
}
if err := gw.Close(); err != nil {
Expand Down Expand Up @@ -281,17 +306,6 @@ func (c *Client) ComputeGracePeriod() time.Duration {
return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second))
}

// EnqueueAPMData adds a AgentData struct to the agent data channel, effectively queueing for a send
// to the APM server.
func (c *Client) EnqueueAPMData(agentData AgentData) {
select {
case c.DataChannel <- agentData:
c.logger.Debug("Adding agent data to buffer to be sent to apm server")
default:
c.logger.Warn("Channel full: dropping a subset of agent data")
}
}

// ShouldFlush returns true if the client should flush APM data after processing the event.
func (c *Client) ShouldFlush() bool {
return c.sendStrategy == SyncFlush
Expand All @@ -313,3 +327,37 @@ func (c *Client) WaitForFlush() <-chan struct{} {
defer c.flushMutex.Unlock()
return c.flushCh
}

func (c *Client) forwardAgentData(ctx context.Context, apmData APMData) error {
if c.batch == nil {
metadata, err := ProcessMetadata(apmData)
if err != nil {
return fmt.Errorf("failed to extract metadata from agent payload %w", err)
}
c.batch = NewBatch(c.maxBatchSize, c.maxBatchAge, metadata)
}
return c.PostToApmServer(ctx, apmData)
}

func (c *Client) forwardLambdaData(ctx context.Context, apmData APMData) error {
if c.batch == nil {
// This state is not possible since we start processing lambda
// logs only after metadata is available and batch is created.
return errors.New("unexpected state, metadata not yet set")
}
if err := c.batch.Add(apmData); err != nil {
c.logger.Warnf("Dropping data due to error: %v", err)
}
if c.batch.ShouldShip() {
return c.sendBatch(ctx)
}
return nil
}

func (c *Client) sendBatch(ctx context.Context) error {
if c.batch == nil || c.batch.Count() == 0 {
return nil
}
defer c.batch.Reset()
return c.PostToApmServer(ctx, c.batch.ToAPMData())
}
Loading