Skip to content

[exporter/elasticsearch] Feed the symbolization queues #38577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 19, 2025
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,9 @@ func (e *elasticsearchExporter) pushProfileRecord(
return eventsSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
case otelserializer.ExecutablesIndex:
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionUpdate)
case otelserializer.ExecutablesSymQueueIndex, otelserializer.LeafFramesSymQueueIndex:
// These regular indices have a low write-frequency and can share the executablesSession.
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
default:
return defaultSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ package otelserializer // import "github.com/open-telemetry/opentelemetry-collec
import (
"bytes"
"encoding/json"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pprofile"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles"
)

var now = time.Now

const (
AllEventsIndex = "profiling-events-all"
StackTraceIndex = "profiling-stacktraces"
StackFrameIndex = "profiling-stackframes"
ExecutablesIndex = "profiling-executables"

ExecutablesSymQueueIndex = "profiling-sq-executables"
LeafFramesSymQueueIndex = "profiling-sq-leafframes"
)

// SerializeProfile serializes a profile and calls the `pushData` callback for each generated document.
Expand All @@ -35,6 +41,8 @@ func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationSc
return err
}

nowTime := now()

for _, payload := range data {
event := payload.StackTraceEvent

Expand Down Expand Up @@ -64,6 +72,32 @@ func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationSc
return err
}
}

for _, frame := range payload.UnsymbolizedLeafFrames {
docID := frame.String()
doc := serializeprofiles.LeafFrameSymbolizationData{
FrameID: []string{docID},
Created: nowTime,
Next: nowTime,
Retries: 0,
}
if err = pushDataAsJSON(doc, docID, LeafFramesSymQueueIndex); err != nil {
return err
}
}

for fileID := range payload.UnsymbolizedExecutables {
docID := fileID.Base64()
doc := serializeprofiles.ExecutableSymbolizationData{
FileID: []string{docID},
Created: nowTime,
Next: nowTime,
Retries: 0,
}
if err = pushDataAsJSON(doc, docID, ExecutablesSymQueueIndex); err != nil {
return err
}
}
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
)

func TestSerializeProfile(t *testing.T) {
nowTime := time.Now()
nowTimeStr := nowTime.Format(time.RFC3339Nano)
tests := []struct {
name string
profileCustomizer func(resource pcommon.Resource, scope pcommon.InstrumentationScope, record pprofile.Profile)
Expand Down Expand Up @@ -80,6 +82,20 @@ func TestSerializeProfile(t *testing.T) {
"scripted_upsert": true,
"upsert": map[string]any{},
},
{
"Stacktrace.frame.id": []any{"YA3K_koRAADyvzjEk_X7kgAAAAAAAABv"},
"Symbolization.retries": json.Number("0"),
"Symbolization.time.next": nowTimeStr,
"Time.created": nowTimeStr,
"ecs.version": "",
},
{
"Executable.file.id": []any{"YA3K_koRAADyvzjEk_X7kg"},
"Symbolization.retries": json.Number("0"),
"Symbolization.time.next": nowTimeStr,
"Time.created": nowTimeStr,
"ecs.version": "",
},
{
"@timestamp": "1970-01-01T00:00:00Z",
"Stacktrace.count": json.Number("1"),
Expand All @@ -100,6 +116,14 @@ func TestSerializeProfile(t *testing.T) {
tt.profileCustomizer(resource.Resource(), scope.Scope(), profile)
profiles.MarkReadOnly()

nowOld := now
t.Cleanup(func() {
now = nowOld
})
now = func() time.Time {
return nowTime
}

buf := []*bytes.Buffer{}
err := SerializeProfile(resource.Resource(), scope.Scope(), profile, func(b *bytes.Buffer, _ string, _ string) error {
buf = append(buf, b)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles"

import (
"time"

"go.opentelemetry.io/ebpf-profiler/libpf"
)

Expand All @@ -27,7 +29,8 @@ type StackPayload struct {
StackFrames []StackFrame
Executables []ExeMetadata

UnsymbolizedLeafFrames []libpf.FrameID
UnsymbolizedLeafFrames []libpf.FrameID
UnsymbolizedExecutables map[libpf.FileID]struct{}
}

// StackTraceEvent represents a stacktrace event serializable into ES.
Expand Down Expand Up @@ -130,3 +133,23 @@ func NewExeMetadata(docID string, lastSeen uint32, buildID, fileName string) Exe
},
}
}

// ExecutableSymbolizationData represents an array of executable FileIDs written into the
// executable symbolization queue index.
type ExecutableSymbolizationData struct {
EcsVersion
FileID []string `json:"Executable.file.id"`
Created time.Time `json:"Time.created"`
Next time.Time `json:"Symbolization.time.next"`
Retries int `json:"Symbolization.retries"`
}

// LeafFrameSymbolizationData represents an array of frame IDs written into the
// leaf frame symbolization queue index.
type LeafFrameSymbolizationData struct {
EcsVersion
FrameID []string `json:"Stacktrace.frame.id"`
Created time.Time `json:"Time.created"`
Next time.Time `json:"Symbolization.time.next"`
Retries int `json:"Symbolization.retries"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func checkProfileType(profile pprofile.Profile) error {
// ScopeProfiles, and ProfileContainer.
func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile) ([]StackPayload, error) {
unsymbolizedLeafFrames := make([]libpf.FrameID, 0, profile.Sample().Len())
unsymbolizedExecutables := make(map[libpf.FileID]struct{})
stackPayload := make([]StackPayload, 0, profile.Sample().Len())

hostMetadata := newHostMetadata(resource, scope, profile)
Expand Down Expand Up @@ -111,6 +112,22 @@ func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope
unsymbolizedLeafFrames = append(unsymbolizedLeafFrames, *leafFrame)
}

for j := range frames {
if frameTypes[j].IsError() {
// Artificial error frames can't be symbolized.
continue
}
if isFrameSymbolized(frames[j]) {
// Skip interpreted frames and already symbolized native frames (kernel, Golang is planned).
continue
}
frameID, err := libpf.NewFrameIDFromString(frames[j].DocID)
if err != nil {
return nil, fmt.Errorf("stackPayloads: %w", err)
}
unsymbolizedExecutables[frameID.FileID()] = struct{}{}
}

// Add one event per timestamp and its count value.
for j := 0; j < sample.TimestampsUnixNano().Len(); j++ {
t := sample.TimestampsUnixNano().At(j)
Expand Down Expand Up @@ -139,6 +156,7 @@ func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope
stackPayload[0].Executables = exeMetadata
}
stackPayload[0].UnsymbolizedLeafFrames = unsymbolizedLeafFrames
stackPayload[0].UnsymbolizedExecutables = unsymbolizedExecutables
}

return stackPayload, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func TestTransform(t *testing.T) {
UnsymbolizedLeafFrames: []libpf.FrameID{
libpf.NewFrameID(buildID, address),
},
UnsymbolizedExecutables: map[libpf.FileID]struct{}{
buildID: {},
buildID2: {},
},
},
{
StackTraceEvent: StackTraceEvent{
Expand Down Expand Up @@ -328,6 +332,10 @@ func TestStackPayloads(t *testing.T) {
UnsymbolizedLeafFrames: []libpf.FrameID{
libpf.NewFrameID(buildID, address),
},
UnsymbolizedExecutables: map[libpf.FileID]struct{}{
buildID: {},
buildID2: {},
},
},
{
StackTraceEvent: StackTraceEvent{
Expand Down Expand Up @@ -413,6 +421,10 @@ func TestStackPayloads(t *testing.T) {
UnsymbolizedLeafFrames: []libpf.FrameID{
libpf.NewFrameID(buildID, address),
},
UnsymbolizedExecutables: map[libpf.FileID]struct{}{
buildID: {},
buildID2: {},
},
},
{
StackTraceEvent: StackTraceEvent{
Expand Down