Skip to content

Commit 0d590dd

Browse files
committed
[chore][exporter/elasticsearch]Feed the symbolization queues
1 parent 314888a commit 0d590dd

File tree

6 files changed

+115
-1
lines changed

6 files changed

+115
-1
lines changed

exporter/elasticsearchexporter/exporter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,9 @@ func (e *elasticsearchExporter) pushProfileRecord(
589589
return eventsSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
590590
case otelserializer.ExecutablesIndex:
591591
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionUpdate)
592+
case otelserializer.ExecutablesSymQueueIndex, otelserializer.LeafFramesSymQueueIndex:
593+
// These regular indices have a low write-frequency and can share the executablesSession.
594+
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
592595
default:
593596
return defaultSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
594597
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/profile.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,24 @@ package otelserializer // import "github.com/open-telemetry/opentelemetry-collec
66
import (
77
"bytes"
88
"encoding/json"
9+
"time"
910

1011
"go.opentelemetry.io/collector/pdata/pcommon"
1112
"go.opentelemetry.io/collector/pdata/pprofile"
1213

1314
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles"
1415
)
1516

17+
var now = time.Now
18+
1619
const (
1720
AllEventsIndex = "profiling-events-all"
1821
StackTraceIndex = "profiling-stacktraces"
1922
StackFrameIndex = "profiling-stackframes"
2023
ExecutablesIndex = "profiling-executables"
24+
25+
ExecutablesSymQueueIndex = "profiling-sq-executables"
26+
LeafFramesSymQueueIndex = "profiling-sq-leafframes"
2127
)
2228

2329
// SerializeProfile serializes a profile and calls the `pushData` callback for each generated document.
@@ -35,6 +41,8 @@ func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationSc
3541
return err
3642
}
3743

44+
nowTime := now()
45+
3846
for _, payload := range data {
3947
event := payload.StackTraceEvent
4048

@@ -64,6 +72,32 @@ func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationSc
6472
return err
6573
}
6674
}
75+
76+
for _, frame := range payload.UnsymbolizedLeafFrames {
77+
docID := frame.String()
78+
doc := serializeprofiles.LeafFrameSymbolizationData{
79+
FrameID: []string{docID},
80+
Created: nowTime,
81+
Next: nowTime,
82+
Retries: 0,
83+
}
84+
if err = pushDataAsJSON(doc, docID, LeafFramesSymQueueIndex); err != nil {
85+
return err
86+
}
87+
}
88+
89+
for fileID := range payload.UnsymbolizedExecutables {
90+
docID := fileID.Base64()
91+
doc := serializeprofiles.ExecutableSymbolizationData{
92+
FileID: []string{docID},
93+
Created: nowTime,
94+
Next: nowTime,
95+
Retries: 0,
96+
}
97+
if err = pushDataAsJSON(doc, docID, ExecutablesSymQueueIndex); err != nil {
98+
return err
99+
}
100+
}
67101
}
68102
return nil
69103
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/profile_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
)
2020

2121
func TestSerializeProfile(t *testing.T) {
22+
nowTime := time.Now()
23+
nowTimeStr := nowTime.Format(time.RFC3339Nano)
2224
tests := []struct {
2325
name string
2426
profileCustomizer func(resource pcommon.Resource, scope pcommon.InstrumentationScope, record pprofile.Profile)
@@ -80,6 +82,20 @@ func TestSerializeProfile(t *testing.T) {
8082
"scripted_upsert": true,
8183
"upsert": map[string]any{},
8284
},
85+
{
86+
"Stacktrace.frame.id": []any{"YA3K_koRAADyvzjEk_X7kgAAAAAAAABv"},
87+
"Symbolization.retries": json.Number("0"),
88+
"Symbolization.time.next": nowTimeStr,
89+
"Time.created": nowTimeStr,
90+
"ecs.version": "",
91+
},
92+
{
93+
"Executable.file.id": []any{"YA3K_koRAADyvzjEk_X7kg"},
94+
"Symbolization.retries": json.Number("0"),
95+
"Symbolization.time.next": nowTimeStr,
96+
"Time.created": nowTimeStr,
97+
"ecs.version": "",
98+
},
8399
{
84100
"@timestamp": "1970-01-01T00:00:00Z",
85101
"Stacktrace.count": json.Number("1"),
@@ -100,6 +116,14 @@ func TestSerializeProfile(t *testing.T) {
100116
tt.profileCustomizer(resource.Resource(), scope.Scope(), profile)
101117
profiles.MarkReadOnly()
102118

119+
nowOld := now
120+
t.Cleanup(func() {
121+
now = nowOld
122+
})
123+
now = func() time.Time {
124+
return nowTime
125+
}
126+
103127
buf := []*bytes.Buffer{}
104128
err := SerializeProfile(resource.Resource(), scope.Scope(), profile, func(b *bytes.Buffer, _ string, _ string) error {
105129
buf = append(buf, b)

exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/model.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles"
55

66
import (
7+
"time"
8+
79
"go.opentelemetry.io/ebpf-profiler/libpf"
810
)
911

@@ -27,7 +29,8 @@ type StackPayload struct {
2729
StackFrames []StackFrame
2830
Executables []ExeMetadata
2931

30-
UnsymbolizedLeafFrames []libpf.FrameID
32+
UnsymbolizedLeafFrames []libpf.FrameID
33+
UnsymbolizedExecutables map[libpf.FileID]struct{}
3134
}
3235

3336
// StackTraceEvent represents a stacktrace event serializable into ES.
@@ -130,3 +133,23 @@ func NewExeMetadata(docID string, lastSeen uint32, buildID, fileName string) Exe
130133
},
131134
}
132135
}
136+
137+
// ExecutableSymbolizationData represents an array of executable FileIDs written into the
138+
// executable symbolization queue index.
139+
type ExecutableSymbolizationData struct {
140+
EcsVersion
141+
FileID []string `json:"Executable.file.id"`
142+
Created time.Time `json:"Time.created"`
143+
Next time.Time `json:"Symbolization.time.next"`
144+
Retries int `json:"Symbolization.retries"`
145+
}
146+
147+
// LeafFrameSymbolizationData represents an array of frame IDs written into the
148+
// leaf frame symbolization queue index.
149+
type LeafFrameSymbolizationData struct {
150+
EcsVersion
151+
FrameID []string `json:"Stacktrace.frame.id"`
152+
Created time.Time `json:"Time.created"`
153+
Next time.Time `json:"Symbolization.time.next"`
154+
Retries int `json:"Symbolization.retries"`
155+
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func checkProfileType(profile pprofile.Profile) error {
7878
// ScopeProfiles, and ProfileContainer.
7979
func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile) ([]StackPayload, error) {
8080
unsymbolizedLeafFrames := make([]libpf.FrameID, 0, profile.Sample().Len())
81+
unsymbolizedExecutables := make(map[libpf.FileID]struct{})
8182
stackPayload := make([]StackPayload, 0, profile.Sample().Len())
8283

8384
hostMetadata := newHostMetadata(resource, scope, profile)
@@ -111,6 +112,22 @@ func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope
111112
unsymbolizedLeafFrames = append(unsymbolizedLeafFrames, *leafFrame)
112113
}
113114

115+
for j := range frames {
116+
if frameTypes[j].IsError() {
117+
// Artificial error frames can't be symbolized.
118+
continue
119+
}
120+
if isFrameSymbolized(frames[j]) {
121+
// Skip interpreted frames and already symbolized native frames (kernel, Golang is planned).
122+
continue
123+
}
124+
frameID, err := libpf.NewFrameIDFromString(frames[j].DocID)
125+
if err != nil {
126+
return nil, fmt.Errorf("stackPayloads: %w", err)
127+
}
128+
unsymbolizedExecutables[frameID.FileID()] = struct{}{}
129+
}
130+
114131
// Add one event per timestamp and its count value.
115132
for j := 0; j < sample.TimestampsUnixNano().Len(); j++ {
116133
t := sample.TimestampsUnixNano().At(j)
@@ -139,6 +156,7 @@ func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope
139156
stackPayload[0].Executables = exeMetadata
140157
}
141158
stackPayload[0].UnsymbolizedLeafFrames = unsymbolizedLeafFrames
159+
stackPayload[0].UnsymbolizedExecutables = unsymbolizedExecutables
142160
}
143161

144162
return stackPayload, nil

exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ func TestTransform(t *testing.T) {
216216
UnsymbolizedLeafFrames: []libpf.FrameID{
217217
libpf.NewFrameID(buildID, address),
218218
},
219+
UnsymbolizedExecutables: map[libpf.FileID]struct{}{
220+
buildID: {},
221+
buildID2: {},
222+
},
219223
},
220224
{
221225
StackTraceEvent: StackTraceEvent{
@@ -328,6 +332,10 @@ func TestStackPayloads(t *testing.T) {
328332
UnsymbolizedLeafFrames: []libpf.FrameID{
329333
libpf.NewFrameID(buildID, address),
330334
},
335+
UnsymbolizedExecutables: map[libpf.FileID]struct{}{
336+
buildID: {},
337+
buildID2: {},
338+
},
331339
},
332340
{
333341
StackTraceEvent: StackTraceEvent{
@@ -413,6 +421,10 @@ func TestStackPayloads(t *testing.T) {
413421
UnsymbolizedLeafFrames: []libpf.FrameID{
414422
libpf.NewFrameID(buildID, address),
415423
},
424+
UnsymbolizedExecutables: map[libpf.FileID]struct{}{
425+
buildID: {},
426+
buildID2: {},
427+
},
416428
},
417429
{
418430
StackTraceEvent: StackTraceEvent{

0 commit comments

Comments
 (0)