Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 77 additions & 100 deletions apm-lambda-extension/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"elastic/apm-lambda-extension/logsapi"
"encoding/json"
"fmt"
"github.com/stretchr/testify/suite"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -283,193 +284,169 @@ func eventQueueGenerator(inputQueue []MockEvent, eventsChannel chan MockEvent) {
}

// TESTS
func TestMain(m *testing.M) {
type MainUnitTestsSuite struct {
suite.Suite
eventsChannel chan MockEvent
lambdaServer *httptest.Server
apmServer *httptest.Server
apmServerInternals *APMServerInternals
ctx context.Context
cancel context.CancelFunc
}

func TestMainUnitTestsSuite(t *testing.T) {
suite.Run(t, new(MainUnitTestsSuite))
}

// This function executes before each test case
func (suite *MainUnitTestsSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
http.DefaultServeMux = new(http.ServeMux)
code := m.Run()
os.Exit(code)
suite.eventsChannel = make(chan MockEvent, 100)
suite.lambdaServer, suite.apmServer, suite.apmServerInternals = initMockServers(suite.eventsChannel)
extension.SetApmServerTransportState(extension.Healthy, suite.ctx)
}

// TestStandardEventsChain checks a nominal sequence of events (fast APM server, only one standard event)
func TestStandardEventsChain(t *testing.T) {
eventsChannel := make(chan MockEvent, 100)
lambdaServer, apmServer, apmServerInternals := initMockServers(eventsChannel)
defer lambdaServer.Close()
defer apmServer.Close()
// This function executes after each test case
func (suite *MainUnitTestsSuite) TearDownTest() {
suite.lambdaServer.Close()
suite.apmServer.Close()
suite.cancel()
}

// TestStandardEventsChain checks a nominal sequence of events (fast APM server, only one standard event)
func (suite *MainUnitTestsSuite) TestStandardEventsChain() {
eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.True(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
}

// TestFlush checks if the flushed param does not cause a panic or an unexpected behavior
func TestFlush(t *testing.T) {
eventsChannel := make(chan MockEvent, 100)
lambdaServer, apmServer, apmServerInternals := initMockServers(eventsChannel)
defer lambdaServer.Close()
defer apmServer.Close()

func (suite *MainUnitTestsSuite) TestFlush() {
eventsChain := []MockEvent{
{Type: InvokeStandardFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.True(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
}

// TestWaitGroup checks if there is no race condition between the main waitgroups (issue #128)
func TestWaitGroup(t *testing.T) {
eventsChannel := make(chan MockEvent, 100)
lambdaServer, apmServer, apmServerInternals := initMockServers(eventsChannel)
defer lambdaServer.Close()
defer apmServer.Close()

func (suite *MainUnitTestsSuite) TestWaitGroup() {
eventsChain := []MockEvent{
{Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500},
}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.True(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
}

// TestAPMServerDown tests that main does not panic nor runs indefinitely when the APM server is inactive.
func TestAPMServerDown(t *testing.T) {
eventsChannel := make(chan MockEvent, 100)
lambdaServer, apmServer, apmServerInternals := initMockServers(eventsChannel)
defer lambdaServer.Close()
apmServer.Close()

func (suite *MainUnitTestsSuite) TestAPMServerDown() {
suite.apmServer.Close()
eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.False(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.False(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
}

// TestAPMServerHangs tests that main does not panic nor runs indefinitely when the APM server does not respond.
func TestAPMServerHangs(t *testing.T) {
extension.SetApmServerTransportState(extension.Healthy, context.Background())
eventsChannel := make(chan MockEvent, 100)
lambdaServer, apmServer, apmServerInternals := initMockServers(eventsChannel)
defer lambdaServer.Close()
defer apmServer.Close()

func (suite *MainUnitTestsSuite) TestAPMServerHangs() {
eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.False(t, strings.Contains(apmServerInternals.Data, string(Hangs)))
apmServerInternals.UnlockSignalChannel <- struct{}{}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.False(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(Hangs)))
suite.apmServerInternals.UnlockSignalChannel <- struct{}{}
}

// TestAPMServerRecovery tests a scenario where the APM server recovers after hanging.
// The default forwarder timeout is 3 seconds, so we wait 5 seconds until we unlock that hanging state.
// Hence, the APM server is waked up just in time to process the TimelyResponse data frame.
func TestAPMServerRecovery(t *testing.T) {
extension.SetApmServerTransportState(extension.Healthy, context.Background())
func (suite *MainUnitTestsSuite) TestAPMServerRecovery() {
if err := os.Setenv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS", "1"); err != nil {
t.Fail()
suite.T().Fail()
}
if err := os.Setenv("ELASTIC_APM_LOG_LEVEL", "trace"); err != nil {
t.Fail()
suite.T().Fail()
}
eventsChannel := make(chan MockEvent, 100)
lambdaServer, apmServer, apmServerInternals := initMockServers(eventsChannel)
defer lambdaServer.Close()
defer apmServer.Close()

eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5},
{Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, eventsChannel)
eventQueueGenerator(eventsChain, suite.eventsChannel)
go func() {
time.Sleep(2500 * time.Millisecond) // Cannot multiply time.Second by a float
apmServerInternals.UnlockSignalChannel <- struct{}{}
suite.apmServerInternals.UnlockSignalChannel <- struct{}{}
}()
assert.NotPanics(t, main)
assert.True(t, strings.Contains(apmServerInternals.Data, string(Hangs)))
assert.True(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(Hangs)))
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
if err := os.Setenv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS", ""); err != nil {
t.Fail()
suite.T().Fail()
}
}

// TestGracePeriodHangs verifies that the WaitforGracePeriod goroutine ends when main() ends.
// This can be checked by asserting that apmTransportStatus is pending after the execution of main.
func TestGracePeriodHangs(t *testing.T) {
func (suite *MainUnitTestsSuite) TestGracePeriodHangs() {
extension.ApmServerTransportState.Status = extension.Pending
extension.ApmServerTransportState.ReconnectionCount = 100
eventsChannel := make(chan MockEvent, 100)
lambdaServer, apmServer, apmServerInternals := initMockServers(eventsChannel)
defer lambdaServer.Close()
defer apmServer.Close()

eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)

time.Sleep(100 * time.Millisecond)
apmServerInternals.UnlockSignalChannel <- struct{}{}
defer assert.Equal(t, extension.IsTransportStatusHealthyOrPending(), true)
suite.apmServerInternals.UnlockSignalChannel <- struct{}{}
defer assert.Equal(suite.T(), extension.IsTransportStatusHealthyOrPending(), true)
}

// TestAPMServerCrashesDuringExecution tests that main does not panic nor runs indefinitely when the APM server crashes
// during execution.
func TestAPMServerCrashesDuringExecution(t *testing.T) {
eventsChannel := make(chan MockEvent, 100)
lambdaServer, apmServer, apmServerInternals := initMockServers(eventsChannel)
defer lambdaServer.Close()
defer apmServer.Close()

func (suite *MainUnitTestsSuite) TestAPMServerCrashesDuringExecution() {
eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.False(t, strings.Contains(apmServerInternals.Data, string(Crashes)))
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.False(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(Crashes)))
}

// TestFullChannel checks that an overload of APM data chunks is handled correctly, events dropped beyond the 100th one
// if no space left in channel, no panic, no infinite hanging.
func TestFullChannel(t *testing.T) {
eventsChannel := make(chan MockEvent, 1000)
lambdaServer, apmServer, apmServerInternals := initMockServers(eventsChannel)
defer lambdaServer.Close()
defer apmServer.Close()

func (suite *MainUnitTestsSuite) TestFullChannel() {
eventsChain := []MockEvent{
{Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5},
}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.True(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
}

// TestFullChannelSlowAPMServer tests what happens when the APM Data channel is full and the APM server is slow
// (send strategy : background)
func TestFullChannelSlowAPMServer(t *testing.T) {
func (suite *MainUnitTestsSuite) TestFullChannelSlowAPMServer() {
if err := os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background"); err != nil {
t.Fail()
suite.T().Fail()
}
eventsChannel := make(chan MockEvent, 1000)
lambdaServer, apmServer, _ := initMockServers(eventsChannel)
defer lambdaServer.Close()
defer apmServer.Close()

eventsChain := []MockEvent{
{Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5},
}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
// The test should not hang
if err := os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush"); err != nil {
t.Fail()
suite.T().Fail()
}
}