Skip to content

Commit 1dce068

Browse files
committed
Fix service summary overflow bucket regression
Fix regression introduced in elastic#10061 where service summary overflow bucket did not work.
1 parent 7c365e4 commit 1dce068

File tree

4 files changed

+396
-0
lines changed

4 files changed

+396
-0
lines changed

apmpackage/apm/data_stream/service_summary_interval_metrics/elasticsearch/ingest_pipeline/default.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,6 @@ processors:
77
name: observer_ids
88
- pipeline:
99
name: ecs_version
10+
- remove:
11+
field: _metric_descriptions
12+
ignore_missing: true

systemtest/aggregation_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,51 @@ func TestServiceSummaryMetricsAggregation(t *testing.T) {
328328
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits)
329329
}
330330

331+
func TestServiceSummaryMetricsAggregationOverflow(t *testing.T) {
332+
systemtest.CleanupElasticsearch(t)
333+
srv := apmservertest.NewUnstartedServerTB(t)
334+
srv.Config.Aggregation = &apmservertest.AggregationConfig{
335+
ServiceTransactionMaxGroups: 2,
336+
}
337+
require.NoError(t, srv.Start())
338+
339+
f := func(env string) {
340+
t.Setenv("ELASTIC_APM_ENVIRONMENT", env)
341+
timestamp, _ := time.Parse(time.RFC3339Nano, "2006-01-02T15:04:05.999999999Z") // should be truncated to 1s
342+
tracer := srv.Tracer()
343+
tx := tracer.StartTransactionOptions("name", "type", apm.TransactionOptions{Start: timestamp})
344+
tx.Duration = time.Second
345+
tx.End()
346+
tracer.Flush(nil)
347+
}
348+
f("dev")
349+
f("staging")
350+
f("prod")
351+
f("test")
352+
353+
// Wait for the transaction to be indexed, indicating that Elasticsearch
354+
// indices have been setup and we should not risk triggering the shutdown
355+
// timeout while waiting for the aggregated metrics to be indexed.
356+
systemtest.Elasticsearch.ExpectMinDocs(t, 4, "traces-apm*",
357+
estest.TermQuery{Field: "processor.event", Value: "transaction"},
358+
)
359+
// Stop server to ensure metrics are flushed on shutdown.
360+
assert.NoError(t, srv.Close())
361+
systemtest.Elasticsearch.ExpectMinDocs(t, 3, "metrics-apm.service_summary*",
362+
estest.BoolQuery{
363+
Must: []interface{}{
364+
estest.TermQuery{Field: "metricset.name", Value: "service_summary"},
365+
estest.TermQuery{Field: "service.name", Value: "_other"},
366+
},
367+
},
368+
)
369+
result := systemtest.Elasticsearch.ExpectMinDocs(t, 9, "metrics-apm.service_summary*",
370+
estest.TermQuery{Field: "metricset.name", Value: "service_summary"},
371+
)
372+
// Ignore timestamp because overflow bucket uses time.Now()
373+
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits, "@timestamp")
374+
}
375+
331376
func TestNonDefaultRollupIntervalHiddenDataStream(t *testing.T) {
332377
systemtest.CleanupElasticsearch(t)
333378
srv := apmservertest.NewServerTB(t)

systemtest/apmservertest/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ type Config struct {
7272

7373
// ProfilingConfig holds configuration related to profiling.
7474
Profiling *ProfilingConfig `json:"apm-server.profiling,omitempty"`
75+
76+
// AggregationConfig holds configuration related to aggregation.
77+
Aggregation *AggregationConfig `json:"apm-server.aggregation,omitempty"`
7578
}
7679

7780
// Args formats cfg as a list of arguments to pass to apm-server,
@@ -320,6 +323,11 @@ func durationString(d time.Duration) string {
320323
return d.String()
321324
}
322325

326+
// AggregationConfig holds configuration related to aggregation.
327+
type AggregationConfig struct {
328+
ServiceTransactionMaxGroups int `json:"service_transactions.max_groups,omitempty"`
329+
}
330+
323331
func configArgs(cfg Config, extra map[string]interface{}) ([]string, error) {
324332
kv, err := flattenConfig(cfg, extra)
325333
if err != nil {

0 commit comments

Comments
 (0)