From 2a44381cf7163ec8bbfca0698b2591ac596cabbf Mon Sep 17 00:00:00 2001 From: Raphael Manke Date: Fri, 24 Oct 2025 14:00:09 +0200 Subject: [PATCH] feat: auto-inject AWS account ID into telemetry from Lambda extension registration Automatically extract the AWS account ID from the Lambda Extensions API registration response and inject it as the 'cloud.account.id' attribute into all telemetry (traces, logs, metrics) via a confmap converter. Changes: - Add AccountId field to RegisterResponse struct - Request Lambda-Extension-Accept-Feature header with 'accountId' value - Create accountidprocessor converter to inject cloud.account.id attribute - Update lambdacomponents.Components to accept accountID and return converters - Update collector.NewCollector to accept and register custom converters - Update manager to pass account ID through the initialization flow Includes: - Comprehensive tests for JSON unmarshaling with leading zero preservation - Tests for converter behavior across different pipeline configurations - Tests for account ID handling in the extension API client Benefits: - Account ID automatically available in all telemetry without configuration - No environment variables needed, uses AWS Lambda API response - Follows OpenTelemetry Collector converter pattern - Static injection at startup, no runtime overhead --- collector/internal/collector/collector.go | 17 +- .../converter/accountidprocessor/converter.go | 103 +++++++ .../accountidprocessor/converter_test.go | 291 ++++++++++++++++++ collector/internal/extensionapi/client.go | 11 +- .../internal/extensionapi/client_test.go | 126 ++++++++ collector/internal/lifecycle/manager.go | 4 +- collector/lambdacomponents/default.go | 13 +- 7 files changed, 552 insertions(+), 13 deletions(-) create mode 100644 collector/internal/confmap/converter/accountidprocessor/converter.go create mode 100644 collector/internal/confmap/converter/accountidprocessor/converter_test.go create mode 100644 collector/internal/extensionapi/client_test.go diff --git a/collector/internal/collector/collector.go b/collector/internal/collector/collector.go index 5f9ea03c7f..5d782a7e15 100644 --- a/collector/internal/collector/collector.go +++ b/collector/internal/collector/collector.go @@ -69,17 +69,22 @@ func getConfig(logger *zap.Logger) string { return defaultVal } -func NewCollector(logger *zap.Logger, factories otelcol.Factories, version string) *Collector { +func NewCollector(logger *zap.Logger, factories otelcol.Factories, version string, customConverters []confmap.ConverterFactory) *Collector { l := logger.Named("NewCollector") + + // Combine built-in converters with custom converters + converters := []confmap.ConverterFactory{ + confmap.NewConverterFactory(func(set confmap.ConverterSettings) confmap.Converter { + return disablequeuedretryconverter.New() + }), + } + converters = append(converters, customConverters...) + cfgSet := otelcol.ConfigProviderSettings{ ResolverSettings: confmap.ResolverSettings{ URIs: []string{getConfig(l)}, ProviderFactories: []confmap.ProviderFactory{fileprovider.NewFactory(), envprovider.NewFactory(), yamlprovider.NewFactory(), httpsprovider.NewFactory(), httpprovider.NewFactory(), s3provider.NewFactory(), secretsmanagerprovider.NewFactory()}, - ConverterFactories: []confmap.ConverterFactory{ - confmap.NewConverterFactory(func(set confmap.ConverterSettings) confmap.Converter { - return disablequeuedretryconverter.New() - }), - }, + ConverterFactories: converters, }, } diff --git a/collector/internal/confmap/converter/accountidprocessor/converter.go b/collector/internal/confmap/converter/accountidprocessor/converter.go new file mode 100644 index 0000000000..47632affb8 --- /dev/null +++ b/collector/internal/confmap/converter/accountidprocessor/converter.go @@ -0,0 +1,103 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The accountidprocessor implements the Converter for mutating Collector +// configurations to automatically inject the cloud.account.id attribute +// via a resource processor into all pipelines. +package accountidprocessor + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/confmap" +) + +const ( + serviceKey = "service" + pipelinesKey = "pipelines" + processorsKey = "processors" + resourceProc = "resource/aws-account-id" + accountIDAttrKey = "cloud.account.id" +) + +type converter struct { + accountID string +} + +// New returns a confmap.Converter that injects cloud.account.id into all pipelines +func New(accountID string) confmap.Converter { + return &converter{accountID: accountID} +} + +func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { + if c.accountID == "" { + return nil // Skip if no account ID + } + + // Navigate to service.pipelines + serviceVal := conf.Get(serviceKey) + service, ok := serviceVal.(map[string]interface{}) + if !ok { + return nil + } + + pipelinesVal, ok := service[pipelinesKey] + if !ok { + return nil + } + + pipelines, ok := pipelinesVal.(map[string]interface{}) + if !ok { + return nil + } + + updates := make(map[string]interface{}) + + // For each pipeline, add resource processor to beginning + for telemetryType, pipelineVal := range pipelines { + pipeline, ok := pipelineVal.(map[string]interface{}) + if !ok { + continue + } + + processorsVal, _ := pipeline[processorsKey] + processors, ok := processorsVal.([]interface{}) + if !ok { + processors = []interface{}{} + } + + // Prepend resource/aws-account-id processor + processors = append([]interface{}{resourceProc}, processors...) + updates[fmt.Sprintf("%s::%s::%s::%s", serviceKey, pipelinesKey, telemetryType, processorsKey)] = processors + } + + // Configure the resource processor with cloud.account.id attribute + updates[fmt.Sprintf("processors::%s::attributes", resourceProc)] = []map[string]interface{}{ + { + "key": accountIDAttrKey, + "value": c.accountID, + "action": "insert", + }, + } + + // Apply all updates + if len(updates) > 0 { + if err := conf.Merge(confmap.NewFromStringMap(updates)); err != nil { + return err + } + } + + return nil +} diff --git a/collector/internal/confmap/converter/accountidprocessor/converter_test.go b/collector/internal/confmap/converter/accountidprocessor/converter_test.go new file mode 100644 index 0000000000..f182af1d95 --- /dev/null +++ b/collector/internal/confmap/converter/accountidprocessor/converter_test.go @@ -0,0 +1,291 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package accountidprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" +) + +func TestConvert(t *testing.T) { + tests := []struct { + name string + accountID string + input map[string]any + expectedProcs map[string]any + shouldHaveRes bool + }{ + { + name: "empty_account_id", + accountID: "", + input: map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{"batch"}, + "exporters": []any{"otlp"}, + }, + }, + }, + }, + shouldHaveRes: false, + }, + { + name: "no_service", + accountID: "123456789012", + input: map[string]any{ + "receivers": map[string]any{}, + }, + shouldHaveRes: false, + }, + { + name: "no_pipelines", + accountID: "123456789012", + input: map[string]any{ + "service": map[string]any{}, + }, + shouldHaveRes: false, + }, + { + name: "single_pipeline_with_processors", + accountID: "123456789012", + input: map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{"batch"}, + "exporters": []any{"otlp"}, + }, + }, + }, + }, + expectedProcs: map[string]any{ + "resource/aws-account-id": map[string]any{ + "attributes": []map[string]any{ + { + "key": "cloud.account.id", + "value": "123456789012", + "action": "insert", + }, + }, + }, + }, + shouldHaveRes: true, + }, + { + name: "single_pipeline_no_processors", + accountID: "123456789012", + input: map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "exporters": []any{"otlp"}, + }, + }, + }, + }, + expectedProcs: map[string]any{ + "resource/aws-account-id": map[string]any{ + "attributes": []map[string]any{ + { + "key": "cloud.account.id", + "value": "123456789012", + "action": "insert", + }, + }, + }, + }, + shouldHaveRes: true, + }, + { + name: "multiple_pipelines", + accountID: "987654321098", + input: map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{"batch"}, + "exporters": []any{"otlp"}, + }, + "logs": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{}, + "exporters": []any{"otlp"}, + }, + "metrics": map[string]any{ + "receivers": []any{"otlp"}, + "exporters": []any{"prometheus"}, + }, + }, + }, + }, + expectedProcs: map[string]any{ + "resource/aws-account-id": map[string]any{ + "attributes": []map[string]any{ + { + "key": "cloud.account.id", + "value": "987654321098", + "action": "insert", + }, + }, + }, + }, + shouldHaveRes: true, + }, + { + name: "existing_processors", + accountID: "111111111111", + input: map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{"batch", "attributes"}, + "exporters": []any{"otlp"}, + }, + }, + }, + }, + expectedProcs: map[string]any{ + "resource/aws-account-id": map[string]any{ + "attributes": []map[string]any{ + { + "key": "cloud.account.id", + "value": "111111111111", + "action": "insert", + }, + }, + }, + }, + shouldHaveRes: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.input) + converter := New(tt.accountID) + err := converter.Convert(context.Background(), conf) + require.NoError(t, err) + + if !tt.shouldHaveRes { + // For cases where no resource processor should be added + if procVal := conf.Get("processors"); procVal != nil { + procs, ok := procVal.(map[string]any) + if ok { + assert.NotContains(t, procs, "resource/aws-account-id") + } + } + return + } + + // Check that resource processor was added + procVal := conf.Get("processors") + require.NotNil(t, procVal) + procs, ok := procVal.(map[string]any) + require.True(t, ok, "processors should be a map") + + resourceProc, ok := procs["resource/aws-account-id"] + require.True(t, ok, "resource/aws-account-id processor should exist") + + // Verify processor configuration + expectedProc := tt.expectedProcs["resource/aws-account-id"] + assert.Equal(t, expectedProc, resourceProc) + + // Check that all pipelines have the resource processor prepended + serviceVal := conf.Get("service") + require.NotNil(t, serviceVal) + service, ok := serviceVal.(map[string]any) + require.True(t, ok) + + pipelinesVal, ok := service["pipelines"] + require.True(t, ok) + pipelines, ok := pipelinesVal.(map[string]any) + require.True(t, ok) + + for pipelineType, pipelineVal := range pipelines { + pipeline, ok := pipelineVal.(map[string]any) + require.True(t, ok, "pipeline %s should be a map", pipelineType) + + processorsVal, ok := pipeline["processors"] + require.True(t, ok, "pipeline %s should have processors", pipelineType) + processors, ok := processorsVal.([]any) + require.True(t, ok, "processors should be a slice") + + // First processor should be resource/aws-account-id + require.Greater(t, len(processors), 0, "pipeline %s should have at least one processor", pipelineType) + assert.Equal(t, "resource/aws-account-id", processors[0], "first processor in %s should be resource/aws-account-id", pipelineType) + } + }) + } +} + +func TestConvert_AccountIDValues(t *testing.T) { + tests := []struct { + name string + accountID string + }{ + {"12_digits", "123456789012"}, + {"different_account", "999999999999"}, + {"all_zeros", "000000000000"}, + {"sequential", "111111111111"}, + {"leading_zero", "012345678901"}, + {"multiple_leading_zeros", "001234567890"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + input := map[string]any{ + "service": map[string]any{ + "pipelines": map[string]any{ + "traces": map[string]any{ + "receivers": []any{"otlp"}, + "processors": []any{}, + "exporters": []any{"otlp"}, + }, + }, + }, + } + + conf := confmap.NewFromStringMap(input) + converter := New(tt.accountID) + err := converter.Convert(context.Background(), conf) + require.NoError(t, err) + + // Verify the account ID is correctly set + procVal := conf.Get("processors") + procs := procVal.(map[string]any) + resourceProc := procs["resource/aws-account-id"].(map[string]any) + attributes := resourceProc["attributes"].([]map[string]any) + + require.Equal(t, 1, len(attributes)) + assert.Equal(t, tt.accountID, attributes[0]["value"]) + }) + } +} + +func TestNew(t *testing.T) { + accountID := "123456789012" + converter := New(accountID) + assert.NotNil(t, converter) +} diff --git a/collector/internal/extensionapi/client.go b/collector/internal/extensionapi/client.go index 7210a07efa..f68ed9fc35 100644 --- a/collector/internal/extensionapi/client.go +++ b/collector/internal/extensionapi/client.go @@ -30,6 +30,7 @@ type RegisterResponse struct { FunctionName string `json:"functionName"` FunctionVersion string `json:"functionVersion"` Handler string `json:"handler"` + AccountId string `json:"accountId"` ExtensionID string } @@ -65,9 +66,11 @@ const ( ) const ( - extensionNameHeader = "Lambda-Extension-Name" - extensionIdentiferHeader = "Lambda-Extension-Identifier" - extensionErrorType = "Lambda-Extension-Function-Error-Type" + extensionNameHeader = "Lambda-Extension-Name" + extensionIdentiferHeader = "Lambda-Extension-Identifier" + extensionErrorType = "Lambda-Extension-Function-Error-Type" + extensionAcceptFeatureHeader = "Lambda-Extension-Accept-Feature" + accountIdFeature = "accountId" ) // Client is a simple client for the Lambda Extensions API. @@ -104,6 +107,8 @@ func (e *Client) Register(ctx context.Context, filename string) (*RegisterRespon return nil, err } req.Header.Set(extensionNameHeader, filename) + // Request accountId feature in the response + req.Header.Set(extensionAcceptFeatureHeader, accountIdFeature) var registerResp RegisterResponse resp, err := e.doRequest(req, ®isterResp) diff --git a/collector/internal/extensionapi/client_test.go b/collector/internal/extensionapi/client_test.go new file mode 100644 index 0000000000..7d51e81e9b --- /dev/null +++ b/collector/internal/extensionapi/client_test.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensionapi + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRegisterResponseUnmarshalJSON(t *testing.T) { + tests := []struct { + name string + jsonData string + expectedID string + shouldError bool + }{ + { + name: "standard_account_id", + jsonData: `{ + "functionName": "test-function", + "functionVersion": "$LATEST", + "handler": "index.handler", + "accountId": "123456789012" + }`, + expectedID: "123456789012", + shouldError: false, + }, + { + name: "account_id_with_leading_zero", + jsonData: `{ + "functionName": "test-function", + "functionVersion": "$LATEST", + "handler": "index.handler", + "accountId": "012345678901" + }`, + expectedID: "012345678901", + shouldError: false, + }, + { + name: "account_id_with_multiple_leading_zeros", + jsonData: `{ + "functionName": "test-function", + "functionVersion": "$LATEST", + "handler": "index.handler", + "accountId": "001234567890" + }`, + expectedID: "001234567890", + shouldError: false, + }, + { + name: "all_zeros_account_id", + jsonData: `{ + "functionName": "test-function", + "functionVersion": "$LATEST", + "handler": "index.handler", + "accountId": "000000000000" + }`, + expectedID: "000000000000", + shouldError: false, + }, + { + name: "missing_account_id", + jsonData: `{ + "functionName": "test-function", + "functionVersion": "$LATEST", + "handler": "index.handler" + }`, + expectedID: "", + shouldError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var resp RegisterResponse + err := json.Unmarshal([]byte(tt.jsonData), &resp) + + if tt.shouldError { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expectedID, resp.AccountId, "AccountId should match exactly (leading zeros preserved)") + assert.Equal(t, "test-function", resp.FunctionName) + assert.Equal(t, "$LATEST", resp.FunctionVersion) + assert.Equal(t, "index.handler", resp.Handler) + } + }) + } +} + +func TestRegisterResponseLeadingZerosPreserved(t *testing.T) { + // This test specifically validates that leading zeros are preserved + // through the entire JSON unmarshaling process + jsonData := `{ + "functionName": "my-function", + "functionVersion": "1", + "handler": "handler.main", + "accountId": "012345678901" + }` + + var resp RegisterResponse + err := json.Unmarshal([]byte(jsonData), &resp) + require.NoError(t, err) + + // Verify leading zero is preserved + assert.Equal(t, "012345678901", resp.AccountId) + assert.Len(t, resp.AccountId, 12, "Account ID should be exactly 12 characters") + + // Verify it's a string, not converted to a number + assert.IsType(t, "", resp.AccountId) +} diff --git a/collector/internal/lifecycle/manager.go b/collector/internal/lifecycle/manager.go index 052c45f671..eda89c3dbf 100644 --- a/collector/internal/lifecycle/manager.go +++ b/collector/internal/lifecycle/manager.go @@ -86,8 +86,8 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex listener: listener, } - factories, _ := lambdacomponents.Components(res.ExtensionID) - lm.collector = collector.NewCollector(logger, factories, version) + factories, converters, _ := lambdacomponents.Components(res.ExtensionID, res.AccountId) + lm.collector = collector.NewCollector(logger, factories, version, converters) return ctx, lm } diff --git a/collector/lambdacomponents/default.go b/collector/lambdacomponents/default.go index e054bcbea4..58ff751d5a 100644 --- a/collector/lambdacomponents/default.go +++ b/collector/lambdacomponents/default.go @@ -26,6 +26,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor" "github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/exporter/debugexporter" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/exporter/otlphttpexporter" @@ -36,11 +37,12 @@ import ( "go.opentelemetry.io/collector/receiver/otlpreceiver" "go.uber.org/multierr" + "github.com/open-telemetry/opentelemetry-lambda/collector/internal/confmap/converter/accountidprocessor" "github.com/open-telemetry/opentelemetry-lambda/collector/processor/coldstartprocessor" "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" ) -func Components(extensionID string) (otelcol.Factories, error) { +func Components(extensionID string, accountID string) (otelcol.Factories, []confmap.ConverterFactory, error) { var errs []error receivers, err := otelcol.MakeFactoryMap( @@ -91,5 +93,12 @@ func Components(extensionID string) (otelcol.Factories, error) { Extensions: extensions, } - return factories, multierr.Combine(errs...) + // Create converter factories + converters := []confmap.ConverterFactory{ + confmap.NewConverterFactory(func(set confmap.ConverterSettings) confmap.Converter { + return accountidprocessor.New(accountID) + }), + } + + return factories, converters, multierr.Combine(errs...) }