-
Couldn't load subscription status.
- Fork 536
Add overflow bucket to service summary #10061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add overflow bucket to service summary #10061
Conversation
|
This pull request does not have a backport label. Could you fix it @carsonip? 🙏
NOTE: |
|
This pull request is now in conflicts. Could you fix it @carsonip? 🙏 |
📚 Go benchmark reportDiff with the report generated with https://pkg.go.dev/golang.org/x/perf/cmd/benchstat |
5c32fa2 to
bc9f72b
Compare
bc9f72b to
d521aec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes largely look good, just a bit more alignment with how this is handled in the other aggregations.
x-pack/apm-server/aggregation/servicesummarymetrics/aggregator.go
Outdated
Show resolved
Hide resolved
|
@simitt @axw I really like how the Same question for service destination. Currently service destination does not have the backing array but just metrics buffer map holding pointers to metricsMapEntry structs all over the heap. No need to reserve memory but probably bad for gc and access time. |
|
@carsonip could you provide micro-benchmarks on both options? |
|
This pull request is now in conflicts. Could you fix it @carsonip? 🙏 |
(updated math, my previous math was VERY wrong)
We already have a backing array for service metrics. Side note: Since map entry contains labels, which contains slices, we may retain references to more objects on the heap and keeping more memory in use than just 136B. With backing array mb.space, benchmarking Without backing array and just keeping addresses of structs on the heap:
IMO, if it doesn't seem wise to reserve memory for this purpose, we can implement the one without backing array first, and optimize it later if it becomes an issue. |
|
@simitt I've removed the mb.space backing array optimization for now to unblock it first. We can add optimizations later on. |
|
Reserving 13.6KB on a 1GB machine, and 870KB on a 64GB machine, is not even remotely concerning to me. Generating a lot of garbage is. Nevertheless, I'm fine with keeping it simple to start with and optimising later - it's fairly insignificant in the grand scheme of things. |
|
I fully agree with @axw these numbers are very small and I'd prefer moginv forward with the backing array, but also fine with me to re-add later. |
|
Updated my math on memory usage because it was very wrong. But let's merge the changes in first and optimize the performance later. There are other aggregators that can use this optimization as well. |
|
@carsonip I tried to test this with 8.7.0 BC2 on ESS but it doesn't seem to be working. I noticed that there are some error messages around the time when I was triggering the overflow that make me think that we forgot to remove the Lines 10 to 12 in 7c365e4
vs Lines 1 to 9 in 7c365e4
I used the following program, as package main
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"os"
"runtime"
"strconv"
"sync"
"time"
"go.elastic.co/apm/v2/model"
"go.elastic.co/fastjson"
)
var host = os.Getenv("ELASTIC_APM_SERVER_URL")
var auth = os.Getenv("ELASTIC_APM_SECRET_TOKEN")
var (
language = model.Language{Name: "go", Version: runtime.Version()}
rtime = model.Runtime{Name: runtime.Compiler, Version: runtime.Version()}
agent = model.Agent{Name: "go", Version: "2.2.0"}
)
func main() {
txGroups := 10
if groups := os.Getenv("TX_GROUPS"); groups != "" {
if v, _ := strconv.ParseInt(groups, 10, 64); v > 0 {
txGroups = int(v)
}
}
services := 1
if groups := os.Getenv("SERVICES"); groups != "" {
if v, _ := strconv.ParseInt(groups, 10, 64); v > 0 {
services = int(v)
}
}
var frequency time.Duration = 0 // <= 0 means send once and exit
if str := os.Getenv("FREQUENCY"); str != "" {
if v, err := time.ParseDuration(str); err == nil {
frequency = v
}
}
var wg sync.WaitGroup
log.Printf("starting load generator with frequency %s", frequency)
for i := 0; i < services; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
var ticker *time.Ticker
if frequency > 0 {
ticker = time.NewTicker(frequency)
}
for {
if ticker != nil {
<-ticker.C
}
var w fastjson.Writer
var batchSize int
for batchSize < 10 {
batchSize = rand.Intn(100)
}
defer flushService(&w, auth)
for i := 0; i < txGroups; i++ {
if i%batchSize == 0 {
if i > 0 {
flushService(&w, auth)
}
writeMetadata(&w, id)
}
txID := i
writeTransaction(&w, txID, id)
}
if ticker == nil {
break
}
}
}(i)
}
wg.Wait()
log.Println("done")
}
func writeMetadata(json *fastjson.Writer, id int) error {
json.RawString(`{"metadata":`)
json.RawString(`{"service":`)
svc := model.Service{
Name: fmt.Sprint("service-", id),
Language: &language,
Runtime: &rtime,
Agent: &agent,
Environment: "dev",
Node: &model.ServiceNode{
ConfiguredName: "service-node",
},
}
if err := svc.MarshalFastJSON(json); err != nil {
return err
}
json.RawByte('}')
json.RawString("}\n")
return nil
}
func writeTransaction(w *fastjson.Writer, txID, id int) {
t := true
sr := 1.0
tx := model.Transaction{
Timestamp: model.Time(time.Now()),
Name: fmt.Sprintf("tx-%d", txID),
Type: "txtype",
Duration: 10 + float64(id),
Outcome: "success",
Result: "HTTP 2xx",
Sampled: &t,
SampleRate: &sr,
}
binary.LittleEndian.PutUint64(tx.TraceID[:8], rand.Uint64())
binary.LittleEndian.PutUint64(tx.TraceID[8:], rand.Uint64())
binary.LittleEndian.PutUint64(tx.ID[:], rand.Uint64())
w.RawString(`{"transaction":`)
tx.MarshalFastJSON(w)
w.RawString("}\n")
}
func flushService(w *fastjson.Writer, auth string) {
buf := make([]byte, w.Size())
copy(buf, w.Bytes())
w.Reset()
req, err := http.NewRequest(http.MethodPost,
host+"/intake/v2/events",
bytes.NewReader(buf),
)
if err != nil {
panic(err)
}
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", auth))
req.Header.Add("Content-type", "application/x-ndjson")
res, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
defer res.Body.Close()
if res.StatusCode >= 400 {
io.Copy(os.Stdout, res.Body)
}
}#!/bin/bash
export ELASTIC_APM_SERVER_URL=${1:-"http://localhost:8200"}
export TX_GROUPS=1
export SERVICES=3000
go run main.go |
|
@marclop thanks for testing this. I'll add a system test and make sure this is fixed. |
|
Since service groups are autosized, unless we duplicate the autosizing in logic in system test, we can't rely on a const and be sure that it will overflow. The best approach imo will be to wait for #10207 for metrics on overflow, so that it is observable from the outside whether it has overflowed. @lahsivjar what do you think? |
|
@carsonip I am not sure if I understand the issue with adding the systemtest correctly.
Based on what I understand, in the systemtest, we can disable autoscaling by setting the specific configurations values to be a positive number.
We can assert this reliably by checking for documents with |
Ah, you're right. I'll proceed with setting the config and checking for |
Fix regression introduced in elastic#10061 where service summary overflow bucket did not work.
Fix regression introduced in elastic#10061 where service summary overflow bucket did not work.
|
Fixing in #10302 |
Fix regression introduced in elastic#10061 where service summary overflow bucket did not work.
Fix regression introduced in #10061 where service summary overflow bucket did not work.
Fix regression introduced in #10061 where service summary overflow bucket did not work. (cherry picked from commit 4ca4bb1) Co-authored-by: Carson Ip <[email protected]>
|
@carsonip I'm re-testing this and still seeing the same issue: The logs still show that error: |
|
There's a stale PR blocking the 8.7 branch updates elastic/kibana#150647, that would be a good explanation as to why this isn't working with the latest BC (BC4). |
|
Verified with BC5 on ESS: {
"hits": {
"hits": [
{
"_index": ".ds-metrics-apm.service_summary.1m-default-2023.03.10-000001",
"_id": "Cm_8yoYBePWpGPOHo5_2",
"_score": 7.379257,
"_source": {
"@timestamp": "2023-03-10T10:05:00.000Z",
"data_stream": {
"dataset": "apm.service_summary.1m",
"namespace": "default",
"type": "metrics"
},
"ecs": {
"version": "8.6.0-dev"
},
"event": {
"agent_id_status": "missing",
"ingested": "2023-03-10T10:06:40Z"
},
"metricset": {
"interval": "1m",
"name": "service_summary"
},
"observer": {
"hostname": "c9e482615886",
"type": "apm-server",
"version": "8.7.0"
},
"processor": {
"event": "metric",
"name": "metric"
},
"service": {
"name": "_other"
},
"service_summary": {
"aggregation": {
"overflow_count": 500
}
}
}
},
{
"_index": ".ds-metrics-apm.service_summary.1m-default-2023.03.10-000001",
"_id": "c2_7yoYBePWpGPOHuY2N",
"_score": 7.379257,
"_source": {
"@timestamp": "2023-03-10T10:04:00.000Z",
"data_stream": {
"dataset": "apm.service_summary.1m",
"namespace": "default",
"type": "metrics"
},
"ecs": {
"version": "8.6.0-dev"
},
"event": {
"agent_id_status": "missing",
"ingested": "2023-03-10T10:05:40Z"
},
"metricset": {
"interval": "1m",
"name": "service_summary"
},
"observer": {
"hostname": "c9e482615886",
"type": "apm-server",
"version": "8.7.0"
},
"processor": {
"event": "metric",
"name": "metric"
},
"service": {
"name": "_other"
},
"service_summary": {
"aggregation": {
"overflow_count": 100
}
}
}
}
]
}
} |
Motivation/summary
Add overflow bucket to service summary to align with transaction metrics and service metrics.
Checklist
apmpackagehave been made)- [] Documentation has been updatedHow to test these changes
service_summary.aggregation.overflow_countRelated issues
Part of #9654