Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 9 additions & 47 deletions internal/beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
package beater

import (
"bytes"
"compress/zlib"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -37,25 +34,16 @@ import (
"github.com/elastic/elastic-agent-libs/monitoring"
)

var validSourcemap, _ = os.ReadFile("../../testdata/sourcemap/bundle.js.map")

func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
var called bool
initCh := make(chan struct{})
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
switch r.URL.Path {
case "/.apm-source-map":
// ping request from the metadata fetcher.
// Send a status ok
w.WriteHeader(http.StatusOK)
case "/.apm-source-map/_search":
// search request from the metadata fetcher
m := sourcemapSearchResponseBody("app", "1.0", "/bundle/path")
w.Write(m)
case "/.apm-source-map/_doc/app-1.0-/bundle/path":
m := sourcemapGetResponseBody(true, validSourcemap)
w.Write(m)
called = true
close(initCh)
default:
w.WriteHeader(http.StatusTeapot)
t.Fatalf("unhandled request path: %s", r.URL.Path)
Expand All @@ -70,19 +58,18 @@ func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
cfg.RumConfig.SourceMapping.ESConfig = elasticsearch.DefaultConfig()
cfg.RumConfig.SourceMapping.ESConfig.Hosts = []string{ts.URL}

fetcher, cancel, err := newSourcemapFetcher(
_, cancel, err := newSourcemapFetcher(
cfg.RumConfig.SourceMapping,
nil, elasticsearch.NewClient,
)
require.NoError(t, err)
defer cancel()
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
c, err := fetcher.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)
require.NotNil(t, c)

assert.True(t, called)
select {
case <-initCh:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for metadata fetcher init to complete")
}
}

func sourcemapSearchResponseBody(name string, version string, bundlePath string) []byte {
Expand Down Expand Up @@ -114,31 +101,6 @@ func sourcemapSearchResponseBody(name string, version string, bundlePath string)
return data
}

func sourcemapGetResponseBody(found bool, b []byte) []byte {
result := map[string]interface{}{
"found": found,
"_source": map[string]interface{}{
"content": encodeSourcemap(b),
},
}

data, err := json.Marshal(result)
if err != nil {
panic(err)
}
return data
}

func encodeSourcemap(sourcemap []byte) string {
b := &bytes.Buffer{}

z := zlib.NewWriter(b)
z.Write(sourcemap)
z.Close()

return base64.StdEncoding.EncodeToString(b.Bytes())
}

func TestQueryClusterUUIDRegistriesExist(t *testing.T) {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
stateRegistry.Clear()
Expand Down
8 changes: 0 additions & 8 deletions internal/sourcemap/chained.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package sourcemap
import (
"context"
"errors"
"time"

"github.com/go-sourcemap/sourcemap"
)
Expand All @@ -44,13 +43,6 @@ func (c ChainedFetcher) Fetch(ctx context.Context, name, version, path string) (
return consumer, err
}

// previous fetcher is unavailable but the deadline expired so we cannot reuse that
if t, _ := ctx.Deadline(); t.Before(time.Now()) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
}

// err is errFetcherUnvailable
// store it in a tmp variable and try the next fetcher
lastErr = err
Expand Down
53 changes: 6 additions & 47 deletions internal/sourcemap/metadata_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ import (
"github.com/elastic/go-elasticsearch/v8/esapi"
)

const (
syncTimeout = 10 * time.Second
)

type MetadataESFetcher struct {
esClient *elasticsearch.Client
index string
Expand Down Expand Up @@ -97,34 +93,14 @@ func (s *MetadataESFetcher) err() error {
}
}

func (s *MetadataESFetcher) startBackgroundSync(parent context.Context) {
func (s *MetadataESFetcher) startBackgroundSync(ctx context.Context) {
go func() {
s.logger.Debug("populating metadata cache")

ctx, cancel := context.WithTimeout(parent, 1*time.Second)
err := s.ping(ctx)
cancel()

if err != nil {
// it is fine to not lock here since err will not access
// initErr until the init channel is closed.
s.initErr = fmt.Errorf("failed to ping es cluster: %w: %v", errFetcherUnvailable, err)
// First run, populate cache
if err := s.sync(ctx); err != nil {
s.initErr = fmt.Errorf("failed to populate sourcemap metadata: %w", err)
s.logger.Error(s.initErr)
} else {
// First run, populate cache
ctx, cancel = context.WithTimeout(parent, syncTimeout)
err := s.sync(ctx)
cancel()

s.initErr = err

if err != nil {
s.logger.Errorf("failed to fetch sourcemaps metadata: %v", err)
} else {
// only close the init chan and mark the fetcher as ready if
// sync succeeded
s.logger.Info("init routine completed")
}
}

close(s.init)
Expand All @@ -135,14 +111,10 @@ func (s *MetadataESFetcher) startBackgroundSync(parent context.Context) {
for {
select {
case <-t.C:
ctx, cancel := context.WithTimeout(parent, syncTimeout)

if err := s.sync(ctx); err != nil {
s.logger.Errorf("failed to sync sourcemaps metadata: %v", err)
}

cancel()
case <-parent.Done():
case <-ctx.Done():
s.logger.Info("update routine done")
// close invalidation channel
close(s.invalidationChan)
Expand All @@ -152,19 +124,6 @@ func (s *MetadataESFetcher) startBackgroundSync(parent context.Context) {
}()
}

func (s *MetadataESFetcher) ping(ctx context.Context) error {
// we cannot use PingRequest because the library is
// building a broken url and the request is timing out.
req := esapi.IndicesGetRequest{
Index: []string{s.index},
}
resp, err := req.Do(ctx, s.esClient)
if err == nil {
resp.Body.Close()
}
return err
}

func (s *MetadataESFetcher) sync(ctx context.Context) error {
sourcemaps := make(map[identifier]string)

Expand Down Expand Up @@ -266,7 +225,7 @@ func (s *MetadataESFetcher) update(ctx context.Context, sourcemaps map[identifie
func (s *MetadataESFetcher) initialSearch(ctx context.Context, updates map[identifier]string) (*esSearchSourcemapResponse, error) {
resp, err := s.runSearchQuery(ctx)
if err != nil {
return nil, fmt.Errorf("failed to run initial search query: %w", err)
return nil, fmt.Errorf("failed to run initial search query: %w: %v", errFetcherUnvailable, err)
}
defer resp.Body.Close()

Expand Down
2 changes: 2 additions & 0 deletions internal/sourcemap/sourcemap_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func (s *SourcemapFetcher) Fetch(ctx context.Context, name, version, path string
}
case <-ctx.Done():
return nil, fmt.Errorf("error waiting for metadata fetcher to be ready: %w", ctx.Err())
default:
return nil, fmt.Errorf("metadata fetcher is not ready: %w", errFetcherUnvailable)
}

if i, ok := s.metadata.getID(original); ok {
Expand Down
6 changes: 0 additions & 6 deletions systemtest/approvals/TestNoMatchingSourcemap.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@
"line": {
"column": 18,
"number": 1
},
"sourcemap": {
"error": "unable to find sourcemap.url for service.name=apm-agent-js service.version=1.0.0 bundle.path=http://subdomain1.localhost:8000/test/e2e/general-usecase/bundle.js.map"
}
},
{
Expand All @@ -65,9 +62,6 @@
"line": {
"column": 18,
"number": 1
},
"sourcemap": {
"error": "unable to find sourcemap.url for service.name=apm-agent-js service.version=1.0.0 bundle.path=http://subdomain2.localhost:8000/test/e2e/general-usecase/bundle.js.map"
}
}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,6 @@
"line": {
"column": 9,
"number": 7662
},
"sourcemap": {
"error": "unable to find sourcemap.url for service.name=apm-a-rum-test-e2e-general-usecase service.version=0.0.1 bundle.path=http://localhost:8000/test/e2e/general-usecase/app.e2e-bundle.min.js"
}
},
{
Expand All @@ -450,9 +447,6 @@
"line": {
"column": 3,
"number": 7666
},
"sourcemap": {
"error": "unable to find sourcemap.url for service.name=apm-a-rum-test-e2e-general-usecase service.version=0.0.1 bundle.path=http://localhost:8000/test/e2e/general-usecase/app.e2e-bundle.min.js"
}
}
],
Expand Down
10 changes: 0 additions & 10 deletions systemtest/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package systemtest
import (
"context"
"net/url"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -102,15 +101,6 @@ func cleanupElasticsearch() error {
},
ExpandWildcards: "all",
}, nil)
if err != nil {
return err
}

_, err = Elasticsearch.Do(context.Background(), &esapi.DeleteByQueryRequest{
Index: []string{".apm-source-map"},
Body: strings.NewReader(`{"query": { "match_all": {}}}`),
Conflicts: "proceed",
}, nil)
return err
}

Expand Down
22 changes: 22 additions & 0 deletions systemtest/kibana.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,5 +314,27 @@ func CreateSourceMap(t testing.TB, sourcemap []byte, serviceName, serviceVersion
Value: id,
})

t.Cleanup(func() {
DeleteSourceMap(t, result.ID)
})

return result.ID
}

// DeleteSourceMap deletes a source map with the given ID.
func DeleteSourceMap(t testing.TB, id string) {
t.Helper()

url := *KibanaURL
url.Path += "/api/apm/sourcemaps/" + id
req, _ := http.NewRequest("DELETE", url.String(), nil)
req.Header.Set("kbn-xsrf", "1")

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode, string(respBody))
}
2 changes: 2 additions & 0 deletions systemtest/sourcemap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestNoMatchingSourcemap(t *testing.T) {

srv := apmservertest.NewUnstartedServerTB(t)
srv.Config.RUM = &apmservertest.RUMConfig{Enabled: true}
srv.Config.Kibana.Enabled = false
err = srv.Start()
require.NoError(t, err)

Expand Down Expand Up @@ -141,6 +142,7 @@ func TestSourcemapCaching(t *testing.T) {

srv := apmservertest.NewUnstartedServerTB(t)
srv.Config.RUM = &apmservertest.RUMConfig{Enabled: true}
srv.Config.Kibana.Enabled = false
err = srv.Start()
require.NoError(t, err)

Expand Down