Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apmpackage/apm/changelog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
- description: Add service destination overflow metrics
type: enhancement
link: https://github.com/elastic/apm-server/pull/10069
- description: Add service summary overflow metrics
type: enhancement
link: https://github.com/elastic/apm-server/pull/10061
- version: "8.6.0"
changes:
- description: Change `ecs.version` to a `constant_keyword` field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@
dynamic: true
description: |
Custom key/value pairs. Can be used to add meta information to events. Should not contain nested objects. All values are stored as scaled_float.
- name: service_summary.aggregation.overflow_count
type: long
description: Number of aggregation groups that overflowed for service summary metrics aggregation.
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ https://github.com/elastic/apm-server/compare/8.6\...main[View commits]
- Add global labels to service destination metrics {pull}10056[10056]
- Service metrics and service summary aggregations are now always enabled {pull}10060[10060]
- Dedicated overflow buckets for service destination aggregation to limit cardinality {pull}10069[10069]
- Dedicated overflow buckets for service summary metrics {pull}10061[10061]
103 changes: 71 additions & 32 deletions x-pack/apm-server/aggregation/servicesummarymetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"

Expand All @@ -23,7 +24,8 @@ import (
)

const (
metricsetName = "service_summary"
metricsetName = "service_summary"
overflowServiceName = "_other"
)

// AggregatorConfig holds configuration for creating an Aggregator.
Expand All @@ -44,8 +46,6 @@ type AggregatorConfig struct {
RollUpIntervals []time.Duration

// Interval is the interval between publishing of aggregated metrics.
// There may be additional metrics reported at arbitrary times if the
// aggregation groups fill up.
Interval time.Duration

// MaxGroups is the maximum number of distinct service summary metrics to
Expand Down Expand Up @@ -123,21 +123,39 @@ func (a *Aggregator) publish(ctx context.Context, period time.Duration) error {
a.active[period], a.inactive[period] = a.inactive[period], current
a.mu.Unlock()

size := len(current.m)
if size == 0 {
if current.entries == 0 {
a.config.Logger.Debugf("no service summary metrics to publish")
return nil
}

size := current.entries
if current.other != nil {
size++
}

intervalStr := interval.FormatDuration(period)
batch := make(model.Batch, 0, size)
for hash, entries := range current.m {
for _, key := range entries {
m := makeMetricset(*key, intervalStr)
for key, metrics := range current.m {
for _, entry := range metrics {
m := makeMetricset(*entry, intervalStr)
batch = append(batch, m)
}
delete(current.m, hash)
delete(current.m, key)
}
if current.other != nil {
m := makeMetricset(*current.other, intervalStr)
m.Metricset.Samples = append(m.Metricset.Samples, model.MetricsetSample{
Name: "service_summary.aggregation.overflow_count",
Value: float64(current.otherCardinalityEstimator.Estimate()),
})
batch = append(batch, m)
}

// Clean up everything.
current.entries = 0
current.other = nil
current.otherCardinalityEstimator = nil

a.config.Logger.Debugf("publishing %d metricsets", len(batch))
return a.config.BatchProcessor.ProcessBatch(ctx, &batch)
}
Expand All @@ -151,24 +169,26 @@ func (a *Aggregator) ProcessBatch(ctx context.Context, b *model.Batch) error {
if event.Processor == model.SpanProcessor {
continue
}
a.processEvent(&event, b)
a.processEvent(&event)
}
return nil
}

func (a *Aggregator) processEvent(event *model.APMEvent, b *model.Batch) {
for _, period := range a.Intervals {
key := makeAggregationKey(event, period)
if !a.active[period].storeOrUpdate(key, a.config.Logger) {
intervalStr := interval.FormatDuration(period)
*b = append(*b, makeMetricset(key, intervalStr))
}
func (a *Aggregator) processEvent(event *model.APMEvent) {
for _, interval := range a.Intervals {
key := makeAggregationKey(event, interval)
a.active[interval].storeOrUpdate(key, interval, a.config.Logger)
}
}

type metricsBuffer struct {
mu sync.RWMutex
m map[uint64][]*aggregationKey
mu sync.RWMutex
m map[uint64][]*aggregationKey
other *aggregationKey
otherCardinalityEstimator *hyperloglog.Sketch

// Number of aggregation keys in m, excluding overflow bucket.
entries int

maxSize int
}
Expand All @@ -182,13 +202,15 @@ func newMetricsBuffer(maxSize int) *metricsBuffer {

func (mb *metricsBuffer) storeOrUpdate(
key aggregationKey,
interval time.Duration,
logger *logp.Logger,
) bool {
) {
hash := key.hash()

mb.mu.Lock()
defer mb.mu.Unlock()

// Search in hash table with separate chaining.
entries, ok := mb.m[hash]
if ok {
ok = false
Expand All @@ -200,21 +222,24 @@ func (mb *metricsBuffer) storeOrUpdate(
}
}

if !ok {
n := len(mb.m)
half := mb.maxSize / 2

switch n {
case mb.maxSize:
return false
case half - 1:
logger.Warn("service summary groups reached 50% capacity")
case mb.maxSize - 1:
logger.Warn("service summary groups reached 100% capacity")
if ok {
return
}

if mb.entries >= mb.maxSize {
if mb.otherCardinalityEstimator == nil {
logger.Warnf(`
Service summary aggregation group limit of %d reached, new metric documents will be grouped
under a dedicated bucket identified by service name '%s'.`[1:], mb.maxSize, overflowServiceName)
key = makeOverflowAggregationKey(interval)
mb.other = &key
mb.otherCardinalityEstimator = hyperloglog.New14()
}
mb.otherCardinalityEstimator.InsertHash(hash)
} else {
mb.m[hash] = append(mb.m[hash], &key)
mb.entries++
}
return true
}

type aggregationKey struct {
Expand Down Expand Up @@ -263,6 +288,20 @@ func makeAggregationKey(event *model.APMEvent, interval time.Duration) aggregati
return key
}

func makeOverflowAggregationKey(interval time.Duration) aggregationKey {
return aggregationKey{
comparable: comparable{
// We are using `time.Now` here to align the overflow aggregation to
// the evaluation time rather than event time. This prevents us from
// cases of bad timestamps when the server receives some events with
// old timestamp and these events overflow causing the indexed event
// to have old timestamp too.
timestamp: time.Now().Truncate(interval),
serviceName: overflowServiceName,
},
}
}

func makeMetricset(key aggregationKey, interval string) model.APMEvent {
return model.APMEvent{
Timestamp: key.timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,60 @@ func TestAggregateTimestamp(t *testing.T) {
assert.Equal(t, t0.Add(30*time.Second), metricsets[1].Timestamp)
}

func TestAggregatorOverflow(t *testing.T) {
maxGrps := 4
overflowCount := 100
txnDuration := 100 * time.Millisecond
batches := make(chan model.Batch, 1)
agg, err := NewAggregator(AggregatorConfig{
BatchProcessor: makeChanBatchProcessor(batches),
Interval: 10 * time.Second,
MaxGroups: maxGrps,
})
require.NoError(t, err)

batch := make(model.Batch, maxGrps+overflowCount) // cause overflow
for i := 0; i < len(batch); i++ {
batch[i] = makeTransaction(
fmt.Sprintf("svc%d", i), "agent", "tx_type", "success", txnDuration, 1,
)
}
go func(t *testing.T) {
t.Helper()
require.NoError(t, agg.Run())
}(t)
require.NoError(t, agg.ProcessBatch(context.Background(), &batch))
require.NoError(t, agg.Stop(context.Background()))
metricsets := batchMetricsets(t, expectBatch(t, batches))
require.Len(t, metricsets, maxGrps+1) // only one `other` metric should overflow
var overflowEvent *model.APMEvent
for i := range metricsets {
m := metricsets[i]
if m.Service.Name == "_other" {
if overflowEvent != nil {
require.Fail(t, "only one service should overflow")
}
overflowEvent = &m
}
}
assert.Empty(t, cmp.Diff(model.APMEvent{
Service: model.Service{
Name: "_other",
},
Processor: model.MetricsetProcessor,
Metricset: &model.Metricset{
Name: "service_summary",
Interval: "10s",
Samples: []model.MetricsetSample{
{
Name: "service_summary.aggregation.overflow_count",
Value: float64(overflowCount),
},
},
},
}, *overflowEvent, cmpopts.IgnoreTypes(netip.Addr{}, time.Time{})))
}

func makeTransaction(
serviceName, agentName, transactionType, outcome string,
duration time.Duration, count float64,
Expand Down