Skip to content

Commit ff4e25a

Browse files
committed
Add size throughput metrics
1 parent e9f3dec commit ff4e25a

File tree

15 files changed

+614
-51
lines changed

15 files changed

+614
-51
lines changed

service/internal/graph/connector.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,13 @@ func (n *connectorNode) buildTraces(
126126
if err != nil {
127127
return err
128128
}
129-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
129+
consumedOpts := []obsconsumer.Option{
130+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
131+
}
132+
if isEnabled(tb.ConnectorConsumedSize) {
133+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
134+
}
135+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
130136
case xpipeline.SignalProfiles:
131137
n.Component, err = builder.CreateProfilesToTraces(ctx, set, next)
132138
if err != nil {
@@ -188,7 +194,13 @@ func (n *connectorNode) buildMetrics(
188194
if err != nil {
189195
return err
190196
}
191-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
197+
consumedOpts := []obsconsumer.Option{
198+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
199+
}
200+
if isEnabled(tb.ConnectorConsumedSize) {
201+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
202+
}
203+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
192204
case xpipeline.SignalProfiles:
193205
n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next)
194206
if err != nil {
@@ -212,16 +224,20 @@ func (n *connectorNode) buildLogs(
212224

213225
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
214226
for _, next := range nexts {
215-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
216-
next.(consumer.Logs),
217-
tb.ConnectorProducedItems,
227+
producedOpts := []obsconsumer.Option{
218228
obsconsumer.WithStaticDataPointAttribute(
219229
otelattr.String(
220230
pipelineIDAttrKey,
221231
next.(*capabilitiesNode).pipelineID.String(),
222232
),
223233
),
224-
)
234+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorProducedSize),
235+
}
236+
if isEnabled(tb.ConnectorProducedSize) {
237+
producedOpts = append(producedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorProducedSize))
238+
}
239+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
240+
next.(consumer.Logs), producedOpts...)
225241
}
226242
next := connector.NewLogsRouter(consumers)
227243

@@ -231,13 +247,19 @@ func (n *connectorNode) buildLogs(
231247
if err != nil {
232248
return err
233249
}
250+
consumedOpts := []obsconsumer.Option{
251+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
252+
}
253+
if isEnabled(tb.ConnectorConsumedSize) {
254+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
255+
}
234256
// Connectors which might pass along data must inherit capabilities of all nexts
235257
n.consumer = obsconsumer.NewLogs(
236258
capabilityconsumer.NewLogs(
237259
n.Component.(consumer.Logs),
238260
aggregateCap(n.Component.(consumer.Logs), nexts),
239261
),
240-
tb.ConnectorConsumedItems,
262+
consumedOpts...,
241263
)
242264
case pipeline.SignalTraces:
243265
n.Component, err = builder.CreateTracesToLogs(ctx, set, next)
@@ -318,7 +340,13 @@ func (n *connectorNode) buildProfiles(
318340
if err != nil {
319341
return err
320342
}
321-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
343+
consumedOpts := []obsconsumer.Option{
344+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
345+
}
346+
if isEnabled(tb.ConnectorConsumedSize) {
347+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
348+
}
349+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
322350
}
323351
return nil
324352
}

service/internal/graph/enabled.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package graph // import "go.opentelemetry.io/collector/service/internal/graph"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/otel/metric"
10+
)
11+
12+
type enabledInstrument interface {
13+
Enabled(context.Context) bool
14+
}
15+
16+
func isEnabled(inst metric.Int64Counter) bool {
17+
_, ok := inst.(enabledInstrument)
18+
return ok
19+
}

service/internal/graph/exporter.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,13 @@ func (n *exporterNode) buildComponent(
7979
if err != nil {
8080
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
8181
}
82-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ExporterConsumedItems)
82+
consumedOpts := []obsconsumer.Option{
83+
obsconsumer.WithLogsSizeCounter(&tb.ExporterConsumedItems),
84+
}
85+
if isEnabled(tb.ExporterConsumedSize) {
86+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ExporterConsumedSize))
87+
}
88+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
8389
case xpipeline.SignalProfiles:
8490
n.Component, err = builder.CreateProfiles(ctx, set)
8591
if err != nil {

service/internal/graph/processor.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,24 @@ func (n *processorNode) buildComponent(ctx context.Context,
7777
}
7878
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ProcessorConsumedItems)
7979
case pipeline.SignalLogs:
80+
producedOpts := []obsconsumer.Option{
81+
obsconsumer.WithLogsSizeCounter(&tb.ProcessorProducedItems),
82+
}
83+
if isEnabled(tb.ProcessorProducedSize) {
84+
producedOpts = append(producedOpts, obsconsumer.WithLogsSizeCounter(&tb.ProcessorProducedSize))
85+
}
8086
n.Component, err = builder.CreateLogs(ctx, set,
81-
obsconsumer.NewLogs(next.(consumer.Logs), tb.ProcessorProducedItems))
87+
obsconsumer.NewLogs(next.(consumer.Logs), producedOpts...))
8288
if err != nil {
8389
return fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, n.pipelineID.String(), err)
8490
}
85-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ProcessorConsumedItems)
91+
consumedOpts := []obsconsumer.Option{
92+
obsconsumer.WithLogsSizeCounter(&tb.ProcessorConsumedItems),
93+
}
94+
if isEnabled(tb.ProcessorConsumedSize) {
95+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ProcessorConsumedSize))
96+
}
97+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
8698
case xpipeline.SignalProfiles:
8799
n.Component, err = builder.CreateProfiles(ctx, set,
88100
obsconsumer.NewProfiles(next.(xconsumer.Profiles), tb.ProcessorProducedItems))

service/internal/graph/receiver.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,14 @@ func (n *receiverNode) buildComponent(ctx context.Context,
7575
for _, next := range nexts {
7676
consumers = append(consumers, next.(consumer.Logs))
7777
}
78+
consumedOpts := []obsconsumer.Option{
79+
obsconsumer.WithLogsSizeCounter(&tb.ReceiverProducedItems),
80+
}
81+
if isEnabled(tb.ReceiverProducedSize) {
82+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ReceiverProducedSize))
83+
}
7884
n.Component, err = builder.CreateLogs(ctx, set,
79-
obsconsumer.NewLogs(fanoutconsumer.NewLogs(consumers), tb.ReceiverProducedItems))
85+
obsconsumer.NewLogs(fanoutconsumer.NewLogs(consumers), consumedOpts...))
8086
case xpipeline.SignalProfiles:
8187
var consumers []xconsumer.Profiles
8288
for _, next := range nexts {

service/internal/metadata/generated_telemetry.go

Lines changed: 42 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)