diff --git a/apm-lambda-extension/apmproxy/apmserver.go b/apm-lambda-extension/apmproxy/apmserver.go index 8f5760af..08a06ec4 100644 --- a/apm-lambda-extension/apmproxy/apmserver.go +++ b/apm-lambda-extension/apmproxy/apmserver.go @@ -218,3 +218,25 @@ func (c *Client) EnqueueAPMData(agentData AgentData) { c.logger.Warn("Channel full: dropping a subset of agent data") } } + +// ShouldFlush returns true if the client should flush APM data after processing the event. +func (c *Client) ShouldFlush() bool { + return c.sendStrategy == SyncFlush +} + +// ResetFlush resets the client's "agent flushed" state, such that +// subsequent calls to WaitForFlush will block until another request +// is received from the agent indicating it has flushed. +func (c *Client) ResetFlush() { + c.flushMutex.Lock() + defer c.flushMutex.Unlock() + c.flushCh = make(chan struct{}) +} + +// WaitForFlush returns a channel that is closed when the agent has signalled that +// the Lambda invocation has completed, and there is no more APM data coming. +func (c *Client) WaitForFlush() <-chan struct{} { + c.flushMutex.Lock() + defer c.flushMutex.Unlock() + return c.flushCh +} diff --git a/apm-lambda-extension/apmproxy/client.go b/apm-lambda-extension/apmproxy/client.go index 18a5748b..5b2bc08c 100644 --- a/apm-lambda-extension/apmproxy/client.go +++ b/apm-lambda-extension/apmproxy/client.go @@ -29,7 +29,19 @@ import ( "go.uber.org/zap" ) +// SendStrategy represents the type of sending strategy the extension uses +type SendStrategy string + const ( + // Background send strategy allows the extension to send remaining buffered + // agent data on the next function invocation + Background SendStrategy = "background" + + // SyncFlush send strategy indicates that the extension will synchronously + // flush remaining buffered agent data when it receives a signal that the + // function is complete + SyncFlush SendStrategy = "syncflush" + defaultDataReceiverTimeout time.Duration = 15 * time.Second defaultDataForwarderTimeout time.Duration = 3 * time.Second defaultReceiverAddr = ":8200" @@ -40,7 +52,6 @@ const ( type Client struct { mu sync.Mutex bufferPool sync.Pool - AgentDoneSignal chan struct{} DataChannel chan AgentData client *http.Client Status Status @@ -49,7 +60,11 @@ type Client struct { ServerSecretToken string serverURL string receiver *http.Server + sendStrategy SendStrategy logger *zap.SugaredLogger + + flushMutex sync.Mutex + flushCh chan struct{} } func NewClient(opts ...Option) (*Client, error) { @@ -69,6 +84,8 @@ func NewClient(opts ...Option) (*Client, error) { WriteTimeout: defaultDataReceiverTimeout, MaxHeaderBytes: 1 << 20, }, + sendStrategy: SyncFlush, + flushCh: make(chan struct{}), } c.client.Timeout = defaultDataForwarderTimeout diff --git a/apm-lambda-extension/apmproxy/option.go b/apm-lambda-extension/apmproxy/option.go index ba00f5c9..60b353f5 100644 --- a/apm-lambda-extension/apmproxy/option.go +++ b/apm-lambda-extension/apmproxy/option.go @@ -64,6 +64,13 @@ func WithReceiverAddress(addr string) Option { } } +// WithSendStrategy sets the sendstrategy. +func WithSendStrategy(strategy SendStrategy) Option { + return func(c *Client) { + c.sendStrategy = strategy + } +} + // WithAgentDataBufferSize sets the agent data buffer size. func WithAgentDataBufferSize(size int) Option { return func(c *Client) { diff --git a/apm-lambda-extension/apmproxy/receiver.go b/apm-lambda-extension/apmproxy/receiver.go index 7744142b..663e0ee5 100644 --- a/apm-lambda-extension/apmproxy/receiver.go +++ b/apm-lambda-extension/apmproxy/receiver.go @@ -121,17 +121,34 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ return } - if len(rawBytes) > 0 { - agentData := AgentData{ - Data: rawBytes, - ContentEncoding: r.Header.Get("Content-Encoding"), - } + agentFlushed := r.URL.Query().Get("flushed") == "true" + + agentData := AgentData{ + Data: rawBytes, + ContentEncoding: r.Header.Get("Content-Encoding"), + } + if len(agentData.Data) != 0 { c.EnqueueAPMData(agentData) } - if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" { - c.AgentDoneSignal <- struct{}{} + if agentFlushed { + c.flushMutex.Lock() + + select { + case <-c.flushCh: + // the channel is closed. + // the extension received at least a flush request already but the + // data have not been flushed yet. + // We can reuse the closed channel. + default: + // no pending flush requests + // close the channel to signal a flush request has + // been received. + close(c.flushCh) + } + + c.flushMutex.Unlock() } w.WriteHeader(http.StatusAccepted) diff --git a/apm-lambda-extension/apmproxy/receiver_test.go b/apm-lambda-extension/apmproxy/receiver_test.go index 20cd95ce..f4e681ac 100644 --- a/apm-lambda-extension/apmproxy/receiver_test.go +++ b/apm-lambda-extension/apmproxy/receiver_test.go @@ -182,7 +182,6 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.AgentDoneSignal = make(chan struct{}, 1) require.NoError(t, apmClient.StartReceiver()) defer func() { require.NoError(t, apmClient.Shutdown()) @@ -203,11 +202,9 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { }() select { - case <-apmClient.AgentDoneSignal: - <-apmClient.DataChannel + case <-apmClient.WaitForFlush(): case <-time.After(1 * time.Second): - t.Log("Timed out waiting for server to send FuncDone signal") - t.Fail() + t.Fatal("Timed out waiting for server to send flush signal") } } @@ -227,7 +224,6 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.AgentDoneSignal = make(chan struct{}, 1) require.NoError(t, apmClient.StartReceiver()) defer func() { require.NoError(t, apmClient.Shutdown()) @@ -271,7 +267,6 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.AgentDoneSignal = make(chan struct{}, 1) require.NoError(t, apmClient.StartReceiver()) defer func() { require.NoError(t, apmClient.Shutdown()) @@ -292,9 +287,8 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { }() select { - case <-apmClient.AgentDoneSignal: + case <-apmClient.WaitForFlush(): case <-time.After(1 * time.Second): - t.Log("Timed out waiting for server to send FuncDone signal") - t.Fail() + t.Fatal("Timed out waiting for server to send flush signal") } } diff --git a/apm-lambda-extension/app/app.go b/apm-lambda-extension/app/app.go index e95949e4..d6e30d27 100644 --- a/apm-lambda-extension/app/app.go +++ b/apm-lambda-extension/app/app.go @@ -25,6 +25,7 @@ import ( "fmt" "os" "strconv" + "strings" "time" "go.elastic.co/ecszap" @@ -97,6 +98,10 @@ func New(opts ...configOption) (*App, error) { apmOpts = append(apmOpts, apmproxy.WithReceiverAddress(fmt.Sprintf(":%s", port))) } + if strategy, ok := parseStrategy(os.Getenv("ELASTIC_APM_SEND_STRATEGY")); ok { + apmOpts = append(apmOpts, apmproxy.WithSendStrategy(strategy)) + } + if bufferSize := os.Getenv("ELASTIC_APM_LAMBDA_AGENT_DATA_BUFFER_SIZE"); bufferSize != "" { size, err := strconv.Atoi(bufferSize) if err != nil { @@ -132,6 +137,17 @@ func getIntFromEnvIfAvailable(name string) (int, error) { return value, nil } +func parseStrategy(value string) (apmproxy.SendStrategy, bool) { + switch strings.ToLower(value) { + case "background": + return apmproxy.Background, true + case "syncflush": + return apmproxy.SyncFlush, true + } + + return "", false +} + func buildLogger(level string) (*zap.SugaredLogger, error) { if level == "" { level = "info" diff --git a/apm-lambda-extension/app/run.go b/apm-lambda-extension/app/run.go index fc2c67a9..85766f5c 100644 --- a/apm-lambda-extension/app/run.go +++ b/apm-lambda-extension/app/run.go @@ -87,6 +87,14 @@ func (app *App) Run(ctx context.Context) error { } } + // Flush all data before shutting down. + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + app.apmClient.FlushAPMData(ctx) + }() + // The previous event id is used to validate the received Lambda metrics var prevEvent *extension.NextEventResponse // This data structure contains metadata tied to the current Lambda instance. If empty, it is populated once for each @@ -114,7 +122,7 @@ func (app *App) Run(ctx context.Context) error { } app.logger.Debug("Waiting for background data send to end") backgroundDataSendWg.Wait() - if config.SendStrategy == extension.SyncFlush { + if app.apmClient.ShouldFlush() { // Flush APM data now that the function invocation has completed app.apmClient.FlushAPMData(ctx) } @@ -129,6 +137,8 @@ func (app *App) processEvent( prevEvent *extension.NextEventResponse, metadataContainer *apmproxy.MetadataContainer, ) (*extension.NextEventResponse, error) { + // Reset flush state for future events. + defer app.apmClient.ResetFlush() // Invocation context invocationCtx, invocationCancel := context.WithCancel(ctx) @@ -161,8 +171,6 @@ func (app *App) processEvent( } // APM Data Processing - app.apmClient.AgentDoneSignal = make(chan struct{}) - defer close(app.apmClient.AgentDoneSignal) backgroundDataSendWg.Add(1) go func() { defer backgroundDataSendWg.Done() @@ -201,9 +209,10 @@ func (app *App) processEvent( // 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline. // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. + select { - case <-app.apmClient.AgentDoneSignal: - app.logger.Debug("Received agent done signal") + case <-app.apmClient.WaitForFlush(): + app.logger.Debug("APM client has pending flush signals") case <-runtimeDone: app.logger.Debug("Received runtimeDone signal") case <-timer.C: