Skip to content

Commit 2f83bff

Browse files
dmathieurockdabootcarsonip
authored andcommitted
elasticsearchexporter: Introduce LRU cache for profiles (open-telemetry#38606)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This introduces an LRU cache for profiles, so we don't overwhelm Elasticsearch with duplicate documents. --------- Co-authored-by: Tim Rühsen <[email protected]> Co-authored-by: Carson Ip <[email protected]>
1 parent facd2bc commit 2f83bff

File tree

13 files changed

+437
-48
lines changed

13 files changed

+437
-48
lines changed

.chloggen/elasticsearch-lru.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Introduce LRU cache for profiles
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38606]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/go.mod

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@ go 1.23.0
44

55
require (
66
github.com/cenkalti/backoff/v4 v4.3.0
7+
github.com/cespare/xxhash v1.1.0
78
github.com/elastic/go-docappender/v2 v2.9.0
89
github.com/elastic/go-elasticsearch/v8 v8.17.1
10+
github.com/elastic/go-freelru v0.16.0
911
github.com/elastic/go-structform v0.0.12
1012
github.com/klauspost/compress v1.18.0
1113
github.com/lestrrat-go/strftime v1.1.0
1214
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.124.1
1315
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.124.1
16+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.124.1
1417
github.com/stretchr/testify v1.10.0
1518
github.com/tidwall/gjson v1.18.0
1619
go.opentelemetry.io/collector/client v1.30.0
@@ -41,10 +44,10 @@ require (
4144
require (
4245
github.com/armon/go-radix v1.0.0 // indirect
4346
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
47+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
4448
github.com/cilium/ebpf v0.16.0 // indirect
4549
github.com/davecgh/go-spew v1.1.1 // indirect
4650
github.com/elastic/elastic-transport-go/v8 v8.6.1 // indirect
47-
github.com/elastic/go-freelru v0.16.0 // indirect
4851
github.com/elastic/go-sysinfo v1.15.3 // indirect
4952
github.com/elastic/go-windows v1.0.2 // indirect
5053
github.com/felixge/httpsnoop v1.0.4 // indirect
@@ -66,6 +69,7 @@ require (
6669
github.com/mitchellh/reflectwalk v1.0.2 // indirect
6770
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
6871
github.com/modern-go/reflect2 v1.0.2 // indirect
72+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.124.1 // indirect
6973
github.com/pierrec/lz4/v4 v4.1.22 // indirect
7074
github.com/pkg/errors v0.9.1 // indirect
7175
github.com/pmezard/go-difflib v1.0.0 // indirect

exporter/elasticsearchexporter/go.sum

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/elasticsearchexporter/integrationtest/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ require (
4343
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
4444
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
4545
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
46+
github.com/cespare/xxhash v1.1.0 // indirect
4647
github.com/cespare/xxhash/v2 v2.3.0 // indirect
4748
github.com/cilium/ebpf v0.16.0 // indirect
4849
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect

exporter/elasticsearchexporter/integrationtest/go.sum

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package lru // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/lru"
5+
6+
import (
7+
"time"
8+
9+
"github.com/cespare/xxhash"
10+
"github.com/elastic/go-freelru"
11+
"go.opentelemetry.io/ebpf-profiler/libpf/xsync"
12+
)
13+
14+
type void struct{}
15+
16+
func stringHashFn(s string) uint32 {
17+
return uint32(xxhash.Sum64String(s))
18+
}
19+
20+
// LockedLRUSet is the interface provided to the LRUSet once a lock has been
21+
// acquired.
22+
type LockedLRUSet interface {
23+
// CheckAndAdd checks whether the entry is already stored in the cache, and
24+
// adds it.
25+
// It returns whether the entry should be excluded, as it was already present
26+
// in cache.
27+
CheckAndAdd(entry string) bool
28+
}
29+
30+
// LRUSet is an LRU cache implementation that allows acquiring a lock, and
31+
// checking whether specific keys have already been stored.
32+
type LRUSet struct {
33+
syncMu *xsync.RWMutex[*freelru.LRU[string, void]]
34+
}
35+
36+
func (l *LRUSet) WithLock(fn func(LockedLRUSet) error) error {
37+
lru := l.syncMu.WLock()
38+
defer l.syncMu.WUnlock(&lru)
39+
40+
return fn(lockedLRUSet{*lru})
41+
}
42+
43+
func NewLRUSet(size uint32, rollover time.Duration) (*LRUSet, error) {
44+
lru, err := freelru.New[string, void](size, stringHashFn)
45+
if err != nil {
46+
return nil, err
47+
}
48+
lru.SetLifetime(rollover)
49+
50+
syncMu := xsync.NewRWMutex(lru)
51+
return &LRUSet{syncMu: &syncMu}, nil
52+
}
53+
54+
type lockedLRUSet struct {
55+
lru *freelru.LRU[string, void]
56+
}
57+
58+
func (l lockedLRUSet) CheckAndAdd(entry string) (excluded bool) {
59+
if _, exclude := (l.lru).Get(entry); exclude {
60+
return true
61+
}
62+
(l.lru).Add(entry, void{})
63+
return false
64+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package lru
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestLRUSet(t *testing.T) {
15+
cache, err := NewLRUSet(5, time.Minute)
16+
require.NoError(t, err)
17+
18+
err = cache.WithLock(func(lock LockedLRUSet) error {
19+
assert.False(t, lock.CheckAndAdd("a"))
20+
assert.True(t, lock.CheckAndAdd("a"))
21+
assert.False(t, lock.CheckAndAdd("b"))
22+
23+
assert.InDelta(t, 0.0, testing.AllocsPerRun(5, func() {
24+
_ = lock.CheckAndAdd("c")
25+
}), 0)
26+
27+
return nil
28+
})
29+
30+
assert.NoError(t, err)
31+
}
32+
33+
func TestLRUSetLifeTime(t *testing.T) {
34+
const lifetime = 100 * time.Millisecond
35+
cache, err := NewLRUSet(5, lifetime)
36+
require.NoError(t, err)
37+
38+
err = cache.WithLock(func(lock LockedLRUSet) error {
39+
assert.False(t, lock.CheckAndAdd("a"))
40+
assert.True(t, lock.CheckAndAdd("a"))
41+
return nil
42+
})
43+
require.NoError(t, err)
44+
45+
// Wait until cache item is expired.
46+
time.Sleep(lifetime)
47+
err = cache.WithLock(func(lock LockedLRUSet) error {
48+
assert.False(t, lock.CheckAndAdd("a"))
49+
assert.True(t, lock.CheckAndAdd("a"))
50+
return nil
51+
})
52+
require.NoError(t, err)
53+
54+
// Wait 50% of the lifetime, so the item is not expired.
55+
time.Sleep(lifetime / 2)
56+
err = cache.WithLock(func(lock LockedLRUSet) error {
57+
assert.True(t, lock.CheckAndAdd("a"))
58+
return nil
59+
})
60+
require.NoError(t, err)
61+
62+
// Wait another 50% of the lifetime, so the item should be expired.
63+
time.Sleep(lifetime / 2)
64+
err = cache.WithLock(func(lock LockedLRUSet) error {
65+
assert.False(t, lock.CheckAndAdd("a"))
66+
return nil
67+
})
68+
require.NoError(t, err)
69+
}
70+
71+
func BenchmarkLRUSetCheck(b *testing.B) {
72+
cache, err := NewLRUSet(5, time.Minute)
73+
require.NoError(b, err)
74+
75+
_ = cache.WithLock(func(lock LockedLRUSet) error {
76+
b.ReportAllocs()
77+
b.ResetTimer()
78+
for i := 0; i < b.N; i++ {
79+
lock.CheckAndAdd("a")
80+
}
81+
82+
return nil
83+
})
84+
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/logs_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010

1111
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
1213
"go.opentelemetry.io/collector/pdata/pcommon"
1314
"go.opentelemetry.io/collector/pdata/plog"
1415

@@ -185,8 +186,9 @@ func TestSerializeLog(t *testing.T) {
185186
logs.MarkReadOnly()
186187

187188
var buf bytes.Buffer
188-
ser := New()
189-
err := ser.SerializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record, elasticsearch.Index{}, &buf)
189+
ser, err := New()
190+
require.NoError(t, err)
191+
err = ser.SerializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record, elasticsearch.Index{}, &buf)
190192
if (err != nil) != tt.wantErr {
191193
t.Errorf("Log() error = %v, wantErr %v", err, tt.wantErr)
192194
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010

1111
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
1213
"go.opentelemetry.io/collector/pdata/pmetric"
1314

1415
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints"
@@ -32,8 +33,9 @@ func TestSerializeMetricsConflict(t *testing.T) {
3233

3334
var validationErrors []error
3435
var buf bytes.Buffer
35-
ser := New()
36-
_, err := ser.SerializeMetrics(resourceMetrics.Resource(), "", scopeMetrics.Scope(), "", dataPoints, &validationErrors, elasticsearch.Index{}, &buf)
36+
ser, err := New()
37+
require.NoError(t, err)
38+
_, err = ser.SerializeMetrics(resourceMetrics.Resource(), "", scopeMetrics.Scope(), "", dataPoints, &validationErrors, elasticsearch.Index{}, &buf)
3739
if err != nil {
3840
t.Errorf("Metrics() error = %v", err)
3941
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/otelserializer.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,35 @@
33

44
package otelserializer // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer"
55

6-
type Serializer struct{}
6+
import (
7+
"sync"
8+
"time"
9+
10+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/lru"
11+
)
12+
13+
const (
14+
knownExecutablesCacheSize = 16 * 1024
15+
knownFramesCacheSize = 128 * 1024
16+
knownTracesCacheSize = 128 * 1024
17+
knownUnsymbolizedFramesCacheSize = 128 * 1024
18+
knownUnsymbolizedExecutablesCacheSize = 16 * 1024
19+
20+
minILMRolloverTime = 3 * time.Hour
21+
)
22+
23+
type Serializer struct {
24+
// Data cache for profiles
25+
loadLRUsOnce sync.Once
26+
lruErr error
27+
knownTraces *lru.LRUSet
28+
knownFrames *lru.LRUSet
29+
knownExecutables *lru.LRUSet
30+
knownUnsymbolizedFrames *lru.LRUSet
31+
knownUnsymbolizedExecutables *lru.LRUSet
32+
}
733

834
// New builds a new Serializer
9-
func New() *Serializer {
10-
return &Serializer{}
35+
func New() (*Serializer, error) {
36+
return &Serializer{}, nil
1137
}

0 commit comments

Comments
 (0)