Skip to content

Statically validate connector config during dry-run. #12712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/enhance_config-validation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otelcol

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Enhance config validation using <validate> command to capture all validation errors that prevents the collector from starting."

# One or more tracking issues or pull requests related to the change
issues: [8721]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
18 changes: 17 additions & 1 deletion otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,23 @@ func (col *Collector) DryRun(ctx context.Context) error {
return fmt.Errorf("failed to get config: %w", err)
}

return xconfmap.Validate(cfg)
if err = xconfmap.Validate(cfg); err != nil {
return err
}

return service.Validate(ctx, service.Settings{
BuildInfo: col.set.BuildInfo,
ReceiversConfigs: cfg.Receivers,
ReceiversFactories: factories.Receivers,
ProcessorsConfigs: cfg.Processors,
ProcessorsFactories: factories.Processors,
ExportersConfigs: cfg.Exporters,
ExportersFactories: factories.Exporters,
ConnectorsConfigs: cfg.Connectors,
ConnectorsFactories: factories.Connectors,
}, service.Config{
Pipelines: cfg.Service.Pipelines,
})
}

func newFallbackLogger(options []zap.Option) (*zap.Logger, error) {
Expand Down
24 changes: 24 additions & 0 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,30 @@ func TestCollectorDryRun(t *testing.T) {
},
expectedErr: `service::pipelines::traces: references processor "invalid" which is not configured`,
},
"invalid_connector_use_unused_exp": {
settings: CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-invalid-connector-unused-exp.yaml")}),
},
expectedErr: `failed to build pipelines: connector "nop/connector1" used as receiver in [logs/in2] pipeline but not used in any supported exporter pipeline`,
},
"invalid_connector_use_unused_rec": {
settings: CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-invalid-connector-unused-rec.yaml")}),
},
expectedErr: `failed to build pipelines: connector "nop/connector1" used as exporter in [logs/in2] pipeline but not used in any supported receiver pipeline`,
},
"cyclic_connector": {
settings: CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-cyclic-connector.yaml")}),
},
expectedErr: `failed to build pipelines: cycle detected: connector "nop/forward" (traces to traces) -> connector "nop/forward" (traces to traces)`,
},
}

for name, test := range tests {
Expand Down
19 changes: 19 additions & 0 deletions otelcol/testdata/otelcol-cyclic-connector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
receivers:
nop:

exporters:
nop:

connectors:
nop/forward:

service:
pipelines:
traces/in:
receivers: [nop/forward]
processors: [ ]
exporters: [nop/forward]
traces/out:
receivers: [nop/forward]
processors: [ ]
exporters: [nop/forward]
23 changes: 23 additions & 0 deletions otelcol/testdata/otelcol-invalid-connector-unused-exp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
receivers:
nop:

exporters:
nop:

connectors:
nop/connector1:

service:
pipelines:
logs/in1:
receivers: [nop]
processors: [ ]
exporters: [nop]
logs/in2:
receivers: [nop/connector1]
processors: [ ]
exporters: [nop]
logs/out:
receivers: [nop]
processors: [ ]
exporters: [nop]
23 changes: 23 additions & 0 deletions otelcol/testdata/otelcol-invalid-connector-unused-rec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
receivers:
nop:

exporters:
nop:

connectors:
nop/connector1:

service:
pipelines:
logs/in1:
receivers: [nop]
processors: [ ]
exporters: [nop]
logs/in2:
receivers: [nop]
processors: [ ]
exporters: [nop/connector1]
logs/out:
receivers: [nop]
processors: [ ]
exporters: [nop]
23 changes: 23 additions & 0 deletions otelcol/testdata/otelcol-valid-connector-use.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
receivers:
nop:

exporters:
nop:

connectors:
nop/connector1:

service:
pipelines:
logs/in1:
receivers: [nop]
processors: [ ]
exporters: [nop]
logs/in2:
receivers: [nop]
processors: [ ]
exporters: [nop/connector1]
logs/out:
receivers: [nop/connector1]
processors: [ ]
exporters: [nop]
1 change: 1 addition & 0 deletions otelcol/unmarshal_dry_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func TestDryRunWithExpandedValues(t *testing.T) {
mockMap: map[string]string{
"number": "123",
},
expectErr: true,
},
{
name: "string that looks like a bool",
Expand Down
25 changes: 25 additions & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
nooptrace "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand Down Expand Up @@ -535,3 +537,26 @@ func configureViews(level configtelemetry.Level) []config.View {
func ptr[T any](v T) *T {
return &v
}

// Validate verifies the graph by calling the internal graph.Build.
func Validate(ctx context.Context, set Settings, cfg Config) error {
tel := component.TelemetrySettings{
Logger: zap.NewNop(),
TracerProvider: nooptrace.NewTracerProvider(),
MeterProvider: noopmetric.NewMeterProvider(),
Resource: pcommon.NewResource(),
}
_, err := graph.Build(ctx, graph.Settings{
Telemetry: tel,
BuildInfo: set.BuildInfo,
ReceiverBuilder: builders.NewReceiver(set.ReceiversConfigs, set.ReceiversFactories),
ProcessorBuilder: builders.NewProcessor(set.ProcessorsConfigs, set.ProcessorsFactories),
ExporterBuilder: builders.NewExporter(set.ExportersConfigs, set.ExportersFactories),
ConnectorBuilder: builders.NewConnector(set.ConnectorsConfigs, set.ConnectorsFactories),
PipelineConfigs: cfg.Pipelines,
})
if err != nil {
return fmt.Errorf("failed to build pipelines: %w", err)
}
return nil
}
166 changes: 166 additions & 0 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,3 +741,169 @@ func newConfigWatcherExtensionFactory(name component.Type) extension.Factory {
func newPtr[T int | string](str T) *T {
return &str
}

func TestValidateGraph(t *testing.T) {
testCases := map[string]struct {
connectorCfg map[component.ID]component.Config
receiverCfg map[component.ID]component.Config
exporterCfg map[component.ID]component.Config
pipelinesCfg pipelines.Config
expectedError string
}{
"Valid connector usage": {
connectorCfg: map[component.ID]component.Config{
component.NewIDWithName(nopType, "connector1"): &struct{}{},
},
receiverCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
exporterCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
pipelinesCfg: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalLogs, "in"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewIDWithName(nopType, "connector1")},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
Receivers: []component.ID{component.NewIDWithName(nopType, "connector1")},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
},
expectedError: "",
},
"Valid without Connector": {
receiverCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
exporterCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
pipelinesCfg: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalLogs, "in"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
},
expectedError: "",
},
"Connector used as exporter but not as receiver": {
connectorCfg: map[component.ID]component.Config{
component.NewIDWithName(nopType, "connector1"): &struct{}{},
},
receiverCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
exporterCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
pipelinesCfg: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalLogs, "in1"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "in2"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewIDWithName(nopType, "connector1")},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
},
expectedError: `failed to build pipelines: connector "nop/connector1" used as exporter in [logs/in2] pipeline but not used in any supported receiver pipeline`,
},
"Connector used as receiver but not as exporter": {
connectorCfg: map[component.ID]component.Config{
component.NewIDWithName(nopType, "connector1"): &struct{}{},
},
receiverCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
exporterCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
pipelinesCfg: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalLogs, "in1"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "in2"): {
Receivers: []component.ID{component.NewIDWithName(nopType, "connector1")},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
},
expectedError: `failed to build pipelines: connector "nop/connector1" used as receiver in [logs/in2] pipeline but not used in any supported exporter pipeline`,
},
"Connector creates direct cycle between pipelines": {
connectorCfg: map[component.ID]component.Config{
component.NewIDWithName(nopType, "forward"): &struct{}{},
},
receiverCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
exporterCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
pipelinesCfg: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalTraces, "in"): {
Receivers: []component.ID{component.NewIDWithName(nopType, "forward")},
Processors: []component.ID{},
Exporters: []component.ID{component.NewIDWithName(nopType, "forward")},
},
pipeline.NewIDWithName(pipeline.SignalTraces, "out"): {
Receivers: []component.ID{component.NewIDWithName(nopType, "forward")},
Processors: []component.ID{},
Exporters: []component.ID{component.NewIDWithName(nopType, "forward")},
},
},
expectedError: `failed to build pipelines: cycle detected: connector "nop/forward" (traces to traces) -> connector "nop/forward" (traces to traces)`,
},
}

_, connectorsFactories := builders.NewNopConnectorConfigsAndFactories()
_, receiversFactories := builders.NewNopReceiverConfigsAndFactories()
_, exportersFactories := builders.NewNopExporterConfigsAndFactories()

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
settings := Settings{
ConnectorsConfigs: tc.connectorCfg,
ConnectorsFactories: connectorsFactories,
ReceiversConfigs: tc.receiverCfg,
ReceiversFactories: receiversFactories,
ExportersConfigs: tc.exporterCfg,
ExportersFactories: exportersFactories,
}
cfg := Config{
Pipelines: tc.pipelinesCfg,
}

err := Validate(context.Background(), settings, cfg)
if tc.expectedError == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
assert.Equal(t, tc.expectedError, err.Error())
}
})
}
}
Loading