diff --git a/collector/internal/collector/collector.go b/collector/internal/collector/collector.go index 5f9ea03c7f..5d782a7e15 100644 --- a/collector/internal/collector/collector.go +++ b/collector/internal/collector/collector.go @@ -69,17 +69,22 @@ func getConfig(logger *zap.Logger) string { return defaultVal } -func NewCollector(logger *zap.Logger, factories otelcol.Factories, version string) *Collector { +func NewCollector(logger *zap.Logger, factories otelcol.Factories, version string, customConverters []confmap.ConverterFactory) *Collector { l := logger.Named("NewCollector") + + // Combine built-in converters with custom converters + converters := []confmap.ConverterFactory{ + confmap.NewConverterFactory(func(set confmap.ConverterSettings) confmap.Converter { + return disablequeuedretryconverter.New() + }), + } + converters = append(converters, customConverters...) + cfgSet := otelcol.ConfigProviderSettings{ ResolverSettings: confmap.ResolverSettings{ URIs: []string{getConfig(l)}, ProviderFactories: []confmap.ProviderFactory{fileprovider.NewFactory(), envprovider.NewFactory(), yamlprovider.NewFactory(), httpsprovider.NewFactory(), httpprovider.NewFactory(), s3provider.NewFactory(), secretsmanagerprovider.NewFactory()}, - ConverterFactories: []confmap.ConverterFactory{ - confmap.NewConverterFactory(func(set confmap.ConverterSettings) confmap.Converter { - return disablequeuedretryconverter.New() - }), - }, + ConverterFactories: converters, }, } diff --git a/collector/internal/confmap/converter/accountidprocessor/converter.go b/collector/internal/confmap/converter/accountidprocessor/converter.go new file mode 100644 index 0000000000..47632affb8 --- /dev/null +++ b/collector/internal/confmap/converter/accountidprocessor/converter.go @@ -0,0 +1,103 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The accountidprocessor implements the Converter for mutating Collector +// configurations to automatically inject the cloud.account.id attribute +// via a resource processor into all pipelines. +package accountidprocessor + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/confmap" +) + +const ( + serviceKey = "service" + pipelinesKey = "pipelines" + processorsKey = "processors" + resourceProc = "resource/aws-account-id" + accountIDAttrKey = "cloud.account.id" +) + +type converter struct { + accountID string +} + +// New returns a confmap.Converter that injects cloud.account.id into all pipelines +func New(accountID string) confmap.Converter { + return &converter{accountID: accountID} +} + +func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { + if c.accountID == "" { + return nil // Skip if no account ID + } + + // Navigate to service.pipelines + serviceVal := conf.Get(serviceKey) + service, ok := serviceVal.(map[string]interface{}) + if !ok { + return nil + } + + pipelinesVal, ok := service[pipelinesKey] + if !ok { + return nil + } + + pipelines, ok := pipelinesVal.(map[string]interface{}) + if !ok { + return nil + } + + updates := make(map[string]interface{}) + + // For each pipeline, add resource processor to beginning + for telemetryType, pipelineVal := range pipelines { + pipeline, ok := pipelineVal.(map[string]interface{}) + if !ok { + continue + } + + processorsVal, _ := pipeline[processorsKey] + processors, ok := processorsVal.([]interface{}) + if !ok { + processors = []interface{}{} + } + + // Prepend resource/aws-account-id processor + processors = append([]interface{}{resourceProc}, processors...) + updates[fmt.Sprintf("%s::%s::%s::%s", serviceKey, pipelinesKey, telemetryType, processorsKey)] = processors + } + + // Configure the resource processor with cloud.account.id attribute + updates[fmt.Sprintf("processors::%s::attributes", resourceProc)] = []map[string]interface{}{ + { + "key": accountIDAttrKey, + "value": c.accountID, + "action": "insert", + }, + } + + // Apply all updates + if len(updates) > 0 { + if err := conf.Merge(confmap.NewFromStringMap(updates)); err != nil { + return err + } + } + + return nil +} diff --git a/collector/internal/confmap/converter/accountidprocessor/converter_test.go b/collector/internal/confmap/converter/accountidprocessor/converter_test.go new file mode 100644 index 0000000000..f182af1d95 --- /dev/null +++ b/collector/internal/confmap/converter/accountidprocessor/converter_test.go @@ -0,0 +1,291 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package accountidprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" +) + +func TestConvert(t *testing.T) { + tests := []struct { + name string + accountID string + input map[string]any + expectedProcs map[string]any + shouldHaveRes bool + }{ + { + name: "empty_account_id", + accountID: "", + input: map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{"batch"}, + "exporters": []any{"otlp"}, + }, + }, + }, + }, + shouldHaveRes: false, + }, + { + name: "no_service", + accountID: "123456789012", + input: map[string]any{ + "receivers": map[string]any{}, + }, + shouldHaveRes: false, + }, + { + name: "no_pipelines", + accountID: "123456789012", + input: map[string]any{ + "service": map[string]any{}, + }, + shouldHaveRes: false, + }, + { + name: "single_pipeline_with_processors", + accountID: "123456789012", + input: map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{"batch"}, + "exporters": []any{"otlp"}, + }, + }, + }, + }, + expectedProcs: map[string]any{ + "resource/aws-account-id": map[string]any{ + "attributes": []map[string]any{ + { + "key": "cloud.account.id", + "value": "123456789012", + "action": "insert", + }, + }, + }, + }, + shouldHaveRes: true, + }, + { + name: "single_pipeline_no_processors", + accountID: "123456789012", + input: map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "exporters": []any{"otlp"}, + }, + }, + }, + }, + expectedProcs: map[string]any{ + "resource/aws-account-id": map[string]any{ + "attributes": []map[string]any{ + { + "key": "cloud.account.id", + "value": "123456789012", + "action": "insert", + }, + }, + }, + }, + shouldHaveRes: true, + }, + { + name: "multiple_pipelines", + accountID: "987654321098", + input: map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{"batch"}, + "exporters": []any{"otlp"}, + }, + "logs": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{}, + "exporters": []any{"otlp"}, + }, + "metrics": map[string]any{ + "receivers": []any{"otlp"}, + "exporters": []any{"prometheus"}, + }, + }, + }, + }, + expectedProcs: map[string]any{ + "resource/aws-account-id": map[string]any{ + "attributes": []map[string]any{ + { + "key": "cloud.account.id", + "value": "987654321098", + "action": "insert", + }, + }, + }, + }, + shouldHaveRes: true, + }, + { + name: "existing_processors", + accountID: "111111111111", + input: map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{"batch", "attributes"}, + "exporters": []any{"otlp"}, + }, + }, + }, + }, + expectedProcs: map[string]any{ + "resource/aws-account-id": map[string]any{ + "attributes": []map[string]any{ + { + "key": "cloud.account.id", + "value": "111111111111", + "action": "insert", + }, + }, + }, + }, + shouldHaveRes: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.input) + converter := New(tt.accountID) + err := converter.Convert(context.Background(), conf) + require.NoError(t, err) + + if !tt.shouldHaveRes { + // For cases where no resource processor should be added + if procVal := conf.Get("processors"); procVal != nil { + procs, ok := procVal.(map[string]any) + if ok { + assert.NotContains(t, procs, "resource/aws-account-id") + } + } + return + } + + // Check that resource processor was added + procVal := conf.Get("processors") + require.NotNil(t, procVal) + procs, ok := procVal.(map[string]any) + require.True(t, ok, "processors should be a map") + + resourceProc, ok := procs["resource/aws-account-id"] + require.True(t, ok, "resource/aws-account-id processor should exist") + + // Verify processor configuration + expectedProc := tt.expectedProcs["resource/aws-account-id"] + assert.Equal(t, expectedProc, resourceProc) + + // Check that all pipelines have the resource processor prepended + serviceVal := conf.Get("service") + require.NotNil(t, serviceVal) + service, ok := serviceVal.(map[string]any) + require.True(t, ok) + + pipelinesVal, ok := service["pipelines"] + require.True(t, ok) + pipelines, ok := pipelinesVal.(map[string]any) + require.True(t, ok) + + for pipelineType, pipelineVal := range pipelines { + pipeline, ok := pipelineVal.(map[string]any) + require.True(t, ok, "pipeline %s should be a map", pipelineType) + + processorsVal, ok := pipeline["processors"] + require.True(t, ok, "pipeline %s should have processors", pipelineType) + processors, ok := processorsVal.([]any) + require.True(t, ok, "processors should be a slice") + + // First processor should be resource/aws-account-id + require.Greater(t, len(processors), 0, "pipeline %s should have at least one processor", pipelineType) + assert.Equal(t, "resource/aws-account-id", processors[0], "first processor in %s should be resource/aws-account-id", pipelineType) + } + }) + } +} + +func TestConvert_AccountIDValues(t *testing.T) { + tests := []struct { + name string + accountID string + }{ + {"12_digits", "123456789012"}, + {"different_account", "999999999999"}, + {"all_zeros", "000000000000"}, + {"sequential", "111111111111"}, + {"leading_zero", "012345678901"}, + {"multiple_leading_zeros", "001234567890"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + input := map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{}, + "exporters": []any{"otlp"}, + }, + }, + }, + } + + conf := confmap.NewFromStringMap(input) + converter := New(tt.accountID) + err := converter.Convert(context.Background(), conf) + require.NoError(t, err) + + // Verify the account ID is correctly set + procVal := conf.Get("processors") + procs := procVal.(map[string]any) + resourceProc := procs["resource/aws-account-id"].(map[string]any) + attributes := resourceProc["attributes"].([]map[string]any) + + require.Equal(t, 1, len(attributes)) + assert.Equal(t, tt.accountID, attributes[0]["value"]) + }) + } +} + +func TestNew(t *testing.T) { + accountID := "123456789012" + converter := New(accountID) + assert.NotNil(t, converter) +} diff --git a/collector/internal/extensionapi/client.go b/collector/internal/extensionapi/client.go index 7210a07efa..f68ed9fc35 100644 --- a/collector/internal/extensionapi/client.go +++ b/collector/internal/extensionapi/client.go @@ -30,6 +30,7 @@ type RegisterResponse struct { FunctionName string `json:"functionName"` FunctionVersion string `json:"functionVersion"` Handler string `json:"handler"` + AccountId string `json:"accountId"` ExtensionID string } @@ -65,9 +66,11 @@ const ( ) const ( - extensionNameHeader = "Lambda-Extension-Name" - extensionIdentiferHeader = "Lambda-Extension-Identifier" - extensionErrorType = "Lambda-Extension-Function-Error-Type" + extensionNameHeader = "Lambda-Extension-Name" + extensionIdentiferHeader = "Lambda-Extension-Identifier" + extensionErrorType = "Lambda-Extension-Function-Error-Type" + extensionAcceptFeatureHeader = "Lambda-Extension-Accept-Feature" + accountIdFeature = "accountId" ) // Client is a simple client for the Lambda Extensions API. @@ -104,6 +107,8 @@ func (e *Client) Register(ctx context.Context, filename string) (*RegisterRespon return nil, err } req.Header.Set(extensionNameHeader, filename) + // Request accountId feature in the response + req.Header.Set(extensionAcceptFeatureHeader, accountIdFeature) var registerResp RegisterResponse resp, err := e.doRequest(req, ®isterResp) diff --git a/collector/internal/extensionapi/client_test.go b/collector/internal/extensionapi/client_test.go new file mode 100644 index 0000000000..7d51e81e9b --- /dev/null +++ b/collector/internal/extensionapi/client_test.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensionapi + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRegisterResponseUnmarshalJSON(t *testing.T) { + tests := []struct { + name string + jsonData string + expectedID string + shouldError bool + }{ + { + name: "standard_account_id", + jsonData: `{ + "functionName": "test-function", + "functionVersion": "$LATEST", + "handler": "index.handler", + "accountId": "123456789012" + }`, + expectedID: "123456789012", + shouldError: false, + }, + { + name: "account_id_with_leading_zero", + jsonData: `{ + "functionName": "test-function", + "functionVersion": "$LATEST", + "handler": "index.handler", + "accountId": "012345678901" + }`, + expectedID: "012345678901", + shouldError: false, + }, + { + name: "account_id_with_multiple_leading_zeros", + jsonData: `{ + "functionName": "test-function", + "functionVersion": "$LATEST", + "handler": "index.handler", + "accountId": "001234567890" + }`, + expectedID: "001234567890", + shouldError: false, + }, + { + name: "all_zeros_account_id", + jsonData: `{ + "functionName": "test-function", + "functionVersion": "$LATEST", + "handler": "index.handler", + "accountId": "000000000000" + }`, + expectedID: "000000000000", + shouldError: false, + }, + { + name: "missing_account_id", + jsonData: `{ + "functionName": "test-function", + "functionVersion": "$LATEST", + "handler": "index.handler" + }`, + expectedID: "", + shouldError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var resp RegisterResponse + err := json.Unmarshal([]byte(tt.jsonData), &resp) + + if tt.shouldError { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expectedID, resp.AccountId, "AccountId should match exactly (leading zeros preserved)") + assert.Equal(t, "test-function", resp.FunctionName) + assert.Equal(t, "$LATEST", resp.FunctionVersion) + assert.Equal(t, "index.handler", resp.Handler) + } + }) + } +} + +func TestRegisterResponseLeadingZerosPreserved(t *testing.T) { + // This test specifically validates that leading zeros are preserved + // through the entire JSON unmarshaling process + jsonData := `{ + "functionName": "my-function", + "functionVersion": "1", + "handler": "handler.main", + "accountId": "012345678901" + }` + + var resp RegisterResponse + err := json.Unmarshal([]byte(jsonData), &resp) + require.NoError(t, err) + + // Verify leading zero is preserved + assert.Equal(t, "012345678901", resp.AccountId) + assert.Len(t, resp.AccountId, 12, "Account ID should be exactly 12 characters") + + // Verify it's a string, not converted to a number + assert.IsType(t, "", resp.AccountId) +} diff --git a/collector/internal/lifecycle/manager.go b/collector/internal/lifecycle/manager.go index 052c45f671..eda89c3dbf 100644 --- a/collector/internal/lifecycle/manager.go +++ b/collector/internal/lifecycle/manager.go @@ -86,8 +86,8 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex listener: listener, } - factories, _ := lambdacomponents.Components(res.ExtensionID) - lm.collector = collector.NewCollector(logger, factories, version) + factories, converters, _ := lambdacomponents.Components(res.ExtensionID, res.AccountId) + lm.collector = collector.NewCollector(logger, factories, version, converters) return ctx, lm } diff --git a/collector/lambdacomponents/default.go b/collector/lambdacomponents/default.go index e054bcbea4..58ff751d5a 100644 --- a/collector/lambdacomponents/default.go +++ b/collector/lambdacomponents/default.go @@ -26,6 +26,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor" "github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/exporter/debugexporter" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/exporter/otlphttpexporter" @@ -36,11 +37,12 @@ import ( "go.opentelemetry.io/collector/receiver/otlpreceiver" "go.uber.org/multierr" + "github.com/open-telemetry/opentelemetry-lambda/collector/internal/confmap/converter/accountidprocessor" "github.com/open-telemetry/opentelemetry-lambda/collector/processor/coldstartprocessor" "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" ) -func Components(extensionID string) (otelcol.Factories, error) { +func Components(extensionID string, accountID string) (otelcol.Factories, []confmap.ConverterFactory, error) { var errs []error receivers, err := otelcol.MakeFactoryMap( @@ -91,5 +93,12 @@ func Components(extensionID string) (otelcol.Factories, error) { Extensions: extensions, } - return factories, multierr.Combine(errs...) + // Create converter factories + converters := []confmap.ConverterFactory{ + confmap.NewConverterFactory(func(set confmap.ConverterSettings) confmap.Converter { + return accountidprocessor.New(accountID) + }), + } + + return factories, converters, multierr.Combine(errs...) }