@@ -26,7 +26,10 @@ type accumulatedValue struct {
26
26
// updated indicates when metric was last changed.
27
27
updated time.Time
28
28
29
- scope pcommon.InstrumentationScope
29
+ scopeName string
30
+ scopeVersion string
31
+ scopeSchemaURL string
32
+ scopeAttributes pcommon.Map
30
33
}
31
34
32
35
// accumulator stores aggregated values of incoming metrics
@@ -35,7 +38,7 @@ type accumulator interface {
35
38
Accumulate (resourceMetrics pmetric.ResourceMetrics ) (processed int )
36
39
// Collect returns a slice with relevant aggregated metrics and their resource attributes.
37
40
// The number or metrics and attributes returned will be the same.
38
- Collect () (metrics []pmetric.Metric , resourceAttrs []pcommon.Map )
41
+ Collect () (metrics []pmetric.Metric , resourceAttrs []pcommon.Map , scopeNames [] string , scopeVersions [] string , scopeSchemaURLs [] string , scopeAttributes []pcommon. Map )
39
42
}
40
43
41
44
// LastValueAccumulator keeps last value for accumulated metrics
@@ -68,25 +71,25 @@ func (a *lastValueAccumulator) Accumulate(rm pmetric.ResourceMetrics) (n int) {
68
71
69
72
metrics := ilm .Metrics ()
70
73
for j := 0 ; j < metrics .Len (); j ++ {
71
- n += a .addMetric (metrics .At (j ), ilm .Scope (), resourceAttrs , now )
74
+ n += a .addMetric (metrics .At (j ), ilm .Scope (). Name (), ilm . Scope (). Version (), ilm . SchemaUrl (), ilm . Scope (). Attributes () , resourceAttrs , now )
72
75
}
73
76
}
74
77
75
78
return
76
79
}
77
80
78
- func (a * lastValueAccumulator ) addMetric (metric pmetric.Metric , il pcommon.InstrumentationScope , resourceAttrs pcommon.Map , now time.Time ) int {
81
+ func (a * lastValueAccumulator ) addMetric (metric pmetric.Metric , scopeName string , scopeVersion string , scopeSchemaURL string , scopeAttributes pcommon.Map , resourceAttrs pcommon.Map , now time.Time ) int {
79
82
a .logger .Debug (fmt .Sprintf ("accumulating metric: %s" , metric .Name ()))
80
83
81
84
switch metric .Type () {
82
85
case pmetric .MetricTypeGauge :
83
- return a .accumulateGauge (metric , il , resourceAttrs , now )
86
+ return a .accumulateGauge (metric , scopeName , scopeVersion , scopeSchemaURL , scopeAttributes , resourceAttrs , now )
84
87
case pmetric .MetricTypeSum :
85
- return a .accumulateSum (metric , il , resourceAttrs , now )
88
+ return a .accumulateSum (metric , scopeName , scopeVersion , scopeSchemaURL , scopeAttributes , resourceAttrs , now )
86
89
case pmetric .MetricTypeHistogram :
87
- return a .accumulateHistogram (metric , il , resourceAttrs , now )
90
+ return a .accumulateHistogram (metric , scopeName , scopeVersion , scopeSchemaURL , scopeAttributes , resourceAttrs , now )
88
91
case pmetric .MetricTypeSummary :
89
- return a .accumulateSummary (metric , il , resourceAttrs , now )
92
+ return a .accumulateSummary (metric , scopeName , scopeVersion , scopeSchemaURL , scopeAttributes , resourceAttrs , now )
90
93
default :
91
94
a .logger .With (
92
95
zap .String ("data_type" , string (metric .Type ())),
@@ -97,12 +100,12 @@ func (a *lastValueAccumulator) addMetric(metric pmetric.Metric, il pcommon.Instr
97
100
return 0
98
101
}
99
102
100
- func (a * lastValueAccumulator ) accumulateSummary (metric pmetric.Metric , il pcommon.InstrumentationScope , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
103
+ func (a * lastValueAccumulator ) accumulateSummary (metric pmetric.Metric , scopeName string , scopeVersion string , scopeSchemaURL string , scopeAttributes pcommon.Map , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
101
104
dps := metric .Summary ().DataPoints ()
102
105
for i := 0 ; i < dps .Len (); i ++ {
103
106
ip := dps .At (i )
104
107
105
- signature := timeseriesSignature (il . Name () , metric , ip .Attributes (), resourceAttrs )
108
+ signature := timeseriesSignature (scopeName , scopeVersion , scopeSchemaURL , scopeAttributes , metric , ip .Attributes (), resourceAttrs )
106
109
if ip .Flags ().NoRecordedValue () {
107
110
a .registeredMetrics .Delete (signature )
108
111
return 0
@@ -119,19 +122,19 @@ func (a *lastValueAccumulator) accumulateSummary(metric pmetric.Metric, il pcomm
119
122
120
123
m := copyMetricMetadata (metric )
121
124
ip .CopyTo (m .SetEmptySummary ().DataPoints ().AppendEmpty ())
122
- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
125
+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaURL : scopeSchemaURL , scopeAttributes : scopeAttributes , updated : now })
123
126
n ++
124
127
}
125
128
126
129
return n
127
130
}
128
131
129
- func (a * lastValueAccumulator ) accumulateGauge (metric pmetric.Metric , il pcommon.InstrumentationScope , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
132
+ func (a * lastValueAccumulator ) accumulateGauge (metric pmetric.Metric , scopeName string , scopeVersion string , scopeSchemaURL string , scopeAttributes pcommon.Map , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
130
133
dps := metric .Gauge ().DataPoints ()
131
134
for i := 0 ; i < dps .Len (); i ++ {
132
135
ip := dps .At (i )
133
136
134
- signature := timeseriesSignature (il . Name () , metric , ip .Attributes (), resourceAttrs )
137
+ signature := timeseriesSignature (scopeName , scopeVersion , scopeSchemaURL , scopeAttributes , metric , ip .Attributes (), resourceAttrs )
135
138
if ip .Flags ().NoRecordedValue () {
136
139
a .registeredMetrics .Delete (signature )
137
140
return 0
@@ -141,7 +144,7 @@ func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon
141
144
if ! ok {
142
145
m := copyMetricMetadata (metric )
143
146
ip .CopyTo (m .SetEmptyGauge ().DataPoints ().AppendEmpty ())
144
- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
147
+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaURL : scopeSchemaURL , scopeAttributes : scopeAttributes , updated : now })
145
148
n ++
146
149
continue
147
150
}
@@ -154,13 +157,13 @@ func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon
154
157
155
158
m := copyMetricMetadata (metric )
156
159
ip .CopyTo (m .SetEmptyGauge ().DataPoints ().AppendEmpty ())
157
- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
160
+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaURL : scopeSchemaURL , scopeAttributes : scopeAttributes , updated : now })
158
161
n ++
159
162
}
160
163
return
161
164
}
162
165
163
- func (a * lastValueAccumulator ) accumulateSum (metric pmetric.Metric , il pcommon.InstrumentationScope , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
166
+ func (a * lastValueAccumulator ) accumulateSum (metric pmetric.Metric , scopeName string , scopeVersion string , scopeSchemaURL string , scopeAttributes pcommon.Map , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
164
167
doubleSum := metric .Sum ()
165
168
166
169
// Drop metrics with unspecified aggregations
@@ -177,7 +180,7 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
177
180
for i := 0 ; i < dps .Len (); i ++ {
178
181
ip := dps .At (i )
179
182
180
- signature := timeseriesSignature (il . Name () , metric , ip .Attributes (), resourceAttrs )
183
+ signature := timeseriesSignature (scopeName , scopeVersion , scopeSchemaURL , scopeAttributes , metric , ip .Attributes (), resourceAttrs )
181
184
if ip .Flags ().NoRecordedValue () {
182
185
a .registeredMetrics .Delete (signature )
183
186
return 0
@@ -189,7 +192,7 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
189
192
m .SetEmptySum ().SetIsMonotonic (metric .Sum ().IsMonotonic ())
190
193
m .Sum ().SetAggregationTemporality (pmetric .AggregationTemporalityCumulative )
191
194
ip .CopyTo (m .Sum ().DataPoints ().AppendEmpty ())
192
- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
195
+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaURL : scopeSchemaURL , scopeAttributes : scopeAttributes , updated : now })
193
196
n ++
194
197
continue
195
198
}
@@ -215,21 +218,21 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
215
218
m .SetEmptySum ().SetIsMonotonic (metric .Sum ().IsMonotonic ())
216
219
m .Sum ().SetAggregationTemporality (pmetric .AggregationTemporalityCumulative )
217
220
ip .CopyTo (m .Sum ().DataPoints ().AppendEmpty ())
218
- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
221
+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaURL : scopeSchemaURL , scopeAttributes : scopeAttributes , updated : now })
219
222
n ++
220
223
}
221
224
return
222
225
}
223
226
224
- func (a * lastValueAccumulator ) accumulateHistogram (metric pmetric.Metric , il pcommon.InstrumentationScope , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
227
+ func (a * lastValueAccumulator ) accumulateHistogram (metric pmetric.Metric , scopeName string , scopeVersion string , scopeSchemaURL string , scopeAttributes pcommon.Map , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
225
228
histogram := metric .Histogram ()
226
229
a .logger .Debug ("Accumulate histogram....." )
227
230
dps := histogram .DataPoints ()
228
231
229
232
for i := 0 ; i < dps .Len (); i ++ {
230
233
ip := dps .At (i )
231
234
232
- signature := timeseriesSignature (il . Name () , metric , ip .Attributes (), resourceAttrs ) // uniquely identify this time series you are accumulating for
235
+ signature := timeseriesSignature (scopeName , scopeVersion , scopeSchemaURL , scopeAttributes , metric , ip .Attributes (), resourceAttrs ) // uniquely identify this time series you are accumulating for
233
236
if ip .Flags ().NoRecordedValue () {
234
237
a .registeredMetrics .Delete (signature )
235
238
return 0
@@ -241,7 +244,7 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
241
244
m := copyMetricMetadata (metric )
242
245
ip .CopyTo (m .SetEmptyHistogram ().DataPoints ().AppendEmpty ())
243
246
m .Histogram ().SetAggregationTemporality (pmetric .AggregationTemporalityCumulative )
244
- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
247
+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaURL : scopeSchemaURL , scopeAttributes : scopeAttributes , updated : now })
245
248
n ++
246
249
continue
247
250
}
@@ -284,18 +287,22 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
284
287
// unsupported temporality
285
288
continue
286
289
}
287
- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
290
+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaURL : scopeSchemaURL , scopeAttributes : scopeAttributes , updated : now })
288
291
n ++
289
292
}
290
293
return
291
294
}
292
295
293
296
// Collect returns a slice with relevant aggregated metrics and their resource attributes.
294
- func (a * lastValueAccumulator ) Collect () ([]pmetric.Metric , []pcommon.Map ) {
297
+ func (a * lastValueAccumulator ) Collect () ([]pmetric.Metric , []pcommon.Map , [] string , [] string , [] string , []pcommon. Map ) {
295
298
a .logger .Debug ("Accumulator collect called" )
296
299
297
300
var metrics []pmetric.Metric
298
301
var resourceAttrs []pcommon.Map
302
+ var scopeNames []string
303
+ var scopeVersions []string
304
+ var scopeSchemaURLs []string
305
+ var scopeAttributes []pcommon.Map
299
306
expirationTime := time .Now ().Add (- a .metricExpiration )
300
307
301
308
a .registeredMetrics .Range (func (key , value any ) bool {
@@ -308,23 +315,38 @@ func (a *lastValueAccumulator) Collect() ([]pmetric.Metric, []pcommon.Map) {
308
315
309
316
metrics = append (metrics , v .value )
310
317
resourceAttrs = append (resourceAttrs , v .resourceAttrs )
318
+ scopeNames = append (scopeNames , v .scopeName )
319
+ scopeVersions = append (scopeVersions , v .scopeVersion )
320
+ scopeSchemaURLs = append (scopeSchemaURLs , v .scopeSchemaURL )
321
+ scopeAttributes = append (scopeAttributes , v .scopeAttributes )
311
322
return true
312
323
})
313
324
314
- return metrics , resourceAttrs
325
+ return metrics , resourceAttrs , scopeNames , scopeVersions , scopeSchemaURLs , scopeAttributes
315
326
}
316
327
317
- func timeseriesSignature (ilmName string , metric pmetric.Metric , attributes pcommon.Map , resourceAttrs pcommon.Map ) string {
328
+ func timeseriesSignature (scopeName string , scopeVersion string , scopeSchemaURL string , scopeAttributes pcommon. Map , metric pmetric.Metric , attributes pcommon.Map , resourceAttrs pcommon.Map ) string {
318
329
var b strings.Builder
319
- b .WriteString (metric .Type ().String ())
320
- b .WriteString ("*" + ilmName )
321
330
b .WriteString ("*" + metric .Name ())
322
- attrs := make ([]string , 0 , attributes .Len ())
331
+ b .WriteString (metric .Type ().String ())
332
+ b .WriteString ("*" + scopeName )
333
+ b .WriteString ("*" + scopeVersion )
334
+ b .WriteString ("*" + scopeSchemaURL )
335
+
336
+ attrs := make ([]string , 0 , scopeAttributes .Len ())
337
+ for k , v := range scopeAttributes .All () {
338
+ attrs = append (attrs , k + "*" + v .AsString ())
339
+ }
340
+ sort .Strings (attrs )
341
+ b .WriteString ("*" + strings .Join (attrs , "*" ))
342
+
343
+ attrs = make ([]string , 0 , attributes .Len ())
323
344
for k , v := range attributes .All () {
324
345
attrs = append (attrs , k + "*" + v .AsString ())
325
346
}
326
347
sort .Strings (attrs )
327
348
b .WriteString ("*" + strings .Join (attrs , "*" ))
349
+
328
350
if job , ok := extractJob (resourceAttrs ); ok {
329
351
b .WriteString ("*" + model .JobLabel + "*" + job )
330
352
}
0 commit comments