Skip to content

Commit 9d6697d

Browse files
committed
Refactor with Enabled check inside obsconsumer
1 parent 4c37d07 commit 9d6697d

17 files changed

+302
-860
lines changed

service/internal/graph/connector.go

Lines changed: 29 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -88,21 +88,15 @@ func (n *connectorNode) buildTraces(
8888

8989
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
9090
for _, next := range nexts {
91-
producedOpts := []obsconsumer.Option{
92-
obsconsumer.WithTracesItemCounter(&tb.ConnectorProducedItems),
91+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewTraces(
92+
next.(consumer.Traces),
93+
tb.ConnectorProducedItems, tb.ConnectorProducedSize,
9394
obsconsumer.WithStaticDataPointAttribute(
9495
otelattr.String(
9596
pipelineIDAttrKey,
9697
next.(*capabilitiesNode).pipelineID.String(),
9798
),
9899
),
99-
}
100-
if isEnabled(tb.ConnectorProducedSize) {
101-
producedOpts = append(producedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorProducedSize))
102-
}
103-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewTraces(
104-
next.(consumer.Traces),
105-
producedOpts...,
106100
)
107101
}
108102
next := connector.NewTracesRouter(consumers)
@@ -114,60 +108,32 @@ func (n *connectorNode) buildTraces(
114108
return err
115109
}
116110

117-
consumedOpts := []obsconsumer.Option{
118-
obsconsumer.WithTracesItemCounter(&tb.ConnectorConsumedItems),
119-
}
120-
if isEnabled(tb.ConnectorConsumedSize) {
121-
consumedOpts = append(consumedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorConsumedSize))
122-
}
123-
124111
// Connectors which might pass along data must inherit capabilities of all nexts
125112
n.consumer = obsconsumer.NewTraces(
126113
capabilityconsumer.NewTraces(
127114
n.Component.(consumer.Traces),
128115
aggregateCap(n.Component.(consumer.Traces), nexts),
129116
),
130-
consumedOpts...,
117+
tb.ConnectorConsumedItems, tb.ConnectorConsumedSize,
131118
)
132119
case pipeline.SignalMetrics:
133120
n.Component, err = builder.CreateMetricsToTraces(ctx, set, next)
134121
if err != nil {
135122
return err
136123
}
137-
138-
consumedOpts := []obsconsumer.Option{
139-
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
140-
}
141-
if isEnabled(tb.ConnectorConsumedSize) {
142-
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
143-
}
144-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), consumedOpts...)
124+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
145125
case pipeline.SignalLogs:
146126
n.Component, err = builder.CreateLogsToTraces(ctx, set, next)
147127
if err != nil {
148128
return err
149129
}
150-
151-
consumedOpts := []obsconsumer.Option{
152-
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
153-
}
154-
if isEnabled(tb.ConnectorConsumedSize) {
155-
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
156-
}
157-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
130+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
158131
case xpipeline.SignalProfiles:
159132
n.Component, err = builder.CreateProfilesToTraces(ctx, set, next)
160133
if err != nil {
161134
return err
162135
}
163-
164-
consumedOpts := []obsconsumer.Option{
165-
obsconsumer.WithProfilesItemCounter(&tb.ConnectorConsumedItems),
166-
}
167-
if isEnabled(tb.ConnectorConsumedSize) {
168-
consumedOpts = append(consumedOpts, obsconsumer.WithProfilesSizeCounter(&tb.ConnectorConsumedSize))
169-
}
170-
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), consumedOpts...)
136+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
171137
}
172138
return nil
173139
}
@@ -185,21 +151,15 @@ func (n *connectorNode) buildMetrics(
185151

186152
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
187153
for _, next := range nexts {
188-
producedOpts := []obsconsumer.Option{
189-
obsconsumer.WithMetricsItemCounter(&tb.ConnectorProducedItems),
154+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewMetrics(
155+
next.(consumer.Metrics),
156+
tb.ConnectorProducedItems, tb.ConnectorProducedSize,
190157
obsconsumer.WithStaticDataPointAttribute(
191158
otelattr.String(
192159
pipelineIDAttrKey,
193160
next.(*capabilitiesNode).pipelineID.String(),
194161
),
195162
),
196-
}
197-
if isEnabled(tb.ConnectorProducedSize) {
198-
producedOpts = append(producedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorProducedSize))
199-
}
200-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewMetrics(
201-
next.(consumer.Metrics),
202-
producedOpts...,
203163
)
204164
}
205165
next := connector.NewMetricsRouter(consumers)
@@ -211,60 +171,32 @@ func (n *connectorNode) buildMetrics(
211171
return err
212172
}
213173

214-
consumedOpts := []obsconsumer.Option{
215-
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
216-
}
217-
if isEnabled(tb.ConnectorConsumedSize) {
218-
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
219-
}
220-
221174
// Connectors which might pass along data must inherit capabilities of all nexts
222175
n.consumer = obsconsumer.NewMetrics(
223176
capabilityconsumer.NewMetrics(
224177
n.Component.(consumer.Metrics),
225178
aggregateCap(n.Component.(consumer.Metrics), nexts),
226179
),
227-
consumedOpts...,
180+
tb.ConnectorConsumedItems, tb.ConnectorConsumedSize,
228181
)
229182
case pipeline.SignalTraces:
230183
n.Component, err = builder.CreateTracesToMetrics(ctx, set, next)
231184
if err != nil {
232185
return err
233186
}
234-
235-
consumedOpts := []obsconsumer.Option{
236-
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
237-
}
238-
if isEnabled(tb.ConnectorConsumedSize) {
239-
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
240-
}
241-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), consumedOpts...)
187+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
242188
case pipeline.SignalLogs:
243189
n.Component, err = builder.CreateLogsToMetrics(ctx, set, next)
244190
if err != nil {
245191
return err
246192
}
247-
248-
consumedOpts := []obsconsumer.Option{
249-
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
250-
}
251-
if isEnabled(tb.ConnectorConsumedSize) {
252-
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
253-
}
254-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
193+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
255194
case xpipeline.SignalProfiles:
256195
n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next)
257196
if err != nil {
258197
return err
259198
}
260-
261-
consumedOpts := []obsconsumer.Option{
262-
obsconsumer.WithProfilesItemCounter(&tb.ConnectorConsumedItems),
263-
}
264-
if isEnabled(tb.ConnectorConsumedSize) {
265-
consumedOpts = append(consumedOpts, obsconsumer.WithProfilesSizeCounter(&tb.ConnectorConsumedSize))
266-
}
267-
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), consumedOpts...)
199+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
268200
}
269201
return nil
270202
}
@@ -282,20 +214,16 @@ func (n *connectorNode) buildLogs(
282214

283215
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
284216
for _, next := range nexts {
285-
producedOpts := []obsconsumer.Option{
286-
obsconsumer.WithLogsSizeCounter(&tb.ConnectorProducedSize),
217+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
218+
next.(consumer.Logs),
219+
tb.ConnectorProducedItems, tb.ConnectorProducedSize,
287220
obsconsumer.WithStaticDataPointAttribute(
288221
otelattr.String(
289222
pipelineIDAttrKey,
290223
next.(*capabilitiesNode).pipelineID.String(),
291224
),
292225
),
293-
}
294-
if isEnabled(tb.ConnectorProducedSize) {
295-
producedOpts = append(producedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorProducedSize))
296-
}
297-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
298-
next.(consumer.Logs), producedOpts...)
226+
)
299227
}
300228
next := connector.NewLogsRouter(consumers)
301229

@@ -306,60 +234,32 @@ func (n *connectorNode) buildLogs(
306234
return err
307235
}
308236

309-
consumedOpts := []obsconsumer.Option{
310-
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
311-
}
312-
if isEnabled(tb.ConnectorConsumedSize) {
313-
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
314-
}
315-
316237
// Connectors which might pass along data must inherit capabilities of all nexts
317238
n.consumer = obsconsumer.NewLogs(
318239
capabilityconsumer.NewLogs(
319240
n.Component.(consumer.Logs),
320241
aggregateCap(n.Component.(consumer.Logs), nexts),
321242
),
322-
consumedOpts...,
243+
tb.ConnectorConsumedItems, tb.ConnectorConsumedSize,
323244
)
324245
case pipeline.SignalTraces:
325246
n.Component, err = builder.CreateTracesToLogs(ctx, set, next)
326247
if err != nil {
327248
return err
328249
}
329-
330-
consumedOpts := []obsconsumer.Option{
331-
obsconsumer.WithTracesItemCounter(&tb.ConnectorConsumedItems),
332-
}
333-
if isEnabled(tb.ConnectorConsumedSize) {
334-
consumedOpts = append(consumedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorConsumedSize))
335-
}
336-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), consumedOpts...)
250+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
337251
case pipeline.SignalMetrics:
338252
n.Component, err = builder.CreateMetricsToLogs(ctx, set, next)
339253
if err != nil {
340254
return err
341255
}
342-
343-
consumedOpts := []obsconsumer.Option{
344-
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
345-
}
346-
if isEnabled(tb.ConnectorConsumedSize) {
347-
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
348-
}
349-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), consumedOpts...)
256+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
350257
case xpipeline.SignalProfiles:
351258
n.Component, err = builder.CreateProfilesToLogs(ctx, set, next)
352259
if err != nil {
353260
return err
354261
}
355-
356-
consumedOpts := []obsconsumer.Option{
357-
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
358-
}
359-
if isEnabled(tb.ConnectorConsumedSize) {
360-
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
361-
}
362-
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), consumedOpts...)
262+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
363263
}
364264
return nil
365265
}
@@ -377,21 +277,15 @@ func (n *connectorNode) buildProfiles(
377277

378278
consumers := make(map[pipeline.ID]xconsumer.Profiles, len(nexts))
379279
for _, next := range nexts {
380-
producedOpts := []obsconsumer.Option{
381-
obsconsumer.WithProfilesItemCounter(&tb.ConnectorProducedItems),
280+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewProfiles(
281+
next.(xconsumer.Profiles),
282+
tb.ConnectorProducedItems, tb.ConnectorProducedSize,
382283
obsconsumer.WithStaticDataPointAttribute(
383284
otelattr.String(
384285
pipelineIDAttrKey,
385286
next.(*capabilitiesNode).pipelineID.String(),
386287
),
387288
),
388-
}
389-
if isEnabled(tb.ConnectorProducedSize) {
390-
producedOpts = append(producedOpts, obsconsumer.WithProfilesSizeCounter(&tb.ConnectorProducedSize))
391-
}
392-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewProfiles(
393-
next.(xconsumer.Profiles),
394-
producedOpts...,
395289
)
396290
}
397291
next := xconnector.NewProfilesRouter(consumers)
@@ -403,60 +297,32 @@ func (n *connectorNode) buildProfiles(
403297
return err
404298
}
405299

406-
consumedOpts := []obsconsumer.Option{
407-
obsconsumer.WithProfilesItemCounter(&tb.ConnectorConsumedItems),
408-
}
409-
if isEnabled(tb.ConnectorConsumedSize) {
410-
consumedOpts = append(consumedOpts, obsconsumer.WithProfilesSizeCounter(&tb.ConnectorConsumedSize))
411-
}
412-
413300
// Connectors which might pass along data must inherit capabilities of all nexts
414301
n.consumer = obsconsumer.NewProfiles(
415302
capabilityconsumer.NewProfiles(
416303
n.Component.(xconsumer.Profiles),
417304
aggregateCap(n.Component.(xconsumer.Profiles), nexts),
418305
),
419-
consumedOpts...,
306+
tb.ConnectorConsumedItems, tb.ConnectorConsumedSize,
420307
)
421308
case pipeline.SignalTraces:
422309
n.Component, err = builder.CreateTracesToProfiles(ctx, set, next)
423310
if err != nil {
424311
return err
425312
}
426-
427-
consumedOpts := []obsconsumer.Option{
428-
obsconsumer.WithTracesItemCounter(&tb.ConnectorConsumedItems),
429-
}
430-
if isEnabled(tb.ConnectorConsumedSize) {
431-
consumedOpts = append(consumedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorConsumedSize))
432-
}
433-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), consumedOpts...)
313+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
434314
case pipeline.SignalMetrics:
435315
n.Component, err = builder.CreateMetricsToProfiles(ctx, set, next)
436316
if err != nil {
437317
return err
438318
}
439-
440-
consumedOpts := []obsconsumer.Option{
441-
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
442-
}
443-
if isEnabled(tb.ConnectorConsumedSize) {
444-
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
445-
}
446-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), consumedOpts...)
319+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
447320
case pipeline.SignalLogs:
448321
n.Component, err = builder.CreateLogsToProfiles(ctx, set, next)
449322
if err != nil {
450323
return err
451324
}
452-
453-
consumedOpts := []obsconsumer.Option{
454-
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
455-
}
456-
if isEnabled(tb.ConnectorConsumedSize) {
457-
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
458-
}
459-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
325+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
460326
}
461327
return nil
462328
}

service/internal/graph/enabled.go

Lines changed: 0 additions & 19 deletions
This file was deleted.

0 commit comments

Comments
 (0)