From 78b91e8212e8444e8ec134ad240dd276906abfb1 Mon Sep 17 00:00:00 2001 From: adamantal Date: Wed, 29 Oct 2025 13:15:52 +0100 Subject: [PATCH 1/4] feat(receiver): aws-like report log for invocations --- collector/internal/telemetryapi/types.go | 2 + .../receiver/telemetryapireceiver/config.go | 1 + .../receiver/telemetryapireceiver/receiver.go | 71 ++++++- .../telemetryapireceiver/receiver_test.go | 187 ++++++++++++++++++ 4 files changed, 259 insertions(+), 2 deletions(-) diff --git a/collector/internal/telemetryapi/types.go b/collector/internal/telemetryapi/types.go index 151589d338..5fc8d7cd42 100644 --- a/collector/internal/telemetryapi/types.go +++ b/collector/internal/telemetryapi/types.go @@ -24,6 +24,8 @@ const ( PlatformInitStart EventType = Platform + ".initStart" // PlatformInitRuntimeDone is used when function initialization ended. PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone" + // PlatformReport is used when a report of function invocation is received. + PlatformReport EventType = Platform + ".report" // Function invocation started. PlatformStart EventType = Platform + ".start" // The runtime finished processing an event with either success or failure. diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index b51ef1ed57..246b8dde39 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -23,6 +23,7 @@ type Config struct { extensionID string Port int `mapstructure:"port"` Types []string `mapstructure:"types"` + LogReport bool `mapstructure:"log_report"` } // Validate validates the configuration by checking for missing or invalid fields diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index b8631809d0..f782fbafe5 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -41,8 +41,16 @@ import ( "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" ) -const initialQueueSize = 5 -const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" +const ( + initialQueueSize = 5 + scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" + + logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %d ms Memory Size: %d MB Max Memory Used: %d MB" + metricBilledDurationMs = "billedDurationMs" + metricDurationMs = "durationMs" + metricMaxMemoryUsedMB = "maxMemoryUsedMB" + metricMemorySizeMB = "memorySizeMB" +) type telemetryAPIReceiver struct { httpServer *http.Server @@ -57,6 +65,7 @@ type telemetryAPIReceiver struct { types []telemetryapi.EventType resource pcommon.Resource currentFaasInvocationID string + logReport bool } func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error { @@ -242,12 +251,69 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } } else if el.Type == string(telemetryapi.PlatformRuntimeDone) { r.currentFaasInvocationID = "" + } else if el.Type == string(telemetryapi.PlatformReport) && r.logReport { + if record, ok := el.Record.(map[string]interface{}); ok { + if logRecord := createReportLogRecord(&scopeLog, record); logRecord != nil { + logRecord.Attributes().PutStr("type", el.Type) + if t, err := time.Parse(time.RFC3339, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) + } else { + continue + } + } + } } } } return log, nil } +// createReportLogRecord creates a log record for the platform.report event +// returns the log record if successful, otherwise nil +func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface{}) *plog.LogRecord { + // gathering metrics + metrics, ok := record["metrics"].(map[string]interface{}) + if !ok { + return nil + } + var durationMs float64 + var billedDurationMs, memorySizeMB, maxMemoryUsedMB int + if durationMs, ok = metrics[metricDurationMs].(float64); !ok { + return nil + } + if billedDurationMs, ok = metrics[metricBilledDurationMs].(int); !ok { + return nil + } + if memorySizeMB, ok = metrics[metricMemorySizeMB].(int); !ok { + return nil + } + if maxMemoryUsedMB, ok = metrics[metricMaxMemoryUsedMB].(int); !ok { + return nil + } + + // gathering requestId + requestId := "" + if requestId, ok = record["requestId"].(string); !ok { + return nil + } + + // we have all information available, we can create the log record + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Body().SetStr( + fmt.Sprintf( + logReportFmt, + requestId, + durationMs, + billedDurationMs, + memorySizeMB, + maxMemoryUsedMB, + ), + ) + + return &logRecord +} + func severityTextToNumber(severityText string) plog.SeverityNumber { mapping := map[string]plog.SeverityNumber{ "TRACE": plog.SeverityNumberTrace, @@ -366,6 +432,7 @@ func newTelemetryAPIReceiver( port: cfg.Port, types: subscribedTypes, resource: r, + logReport: cfg.LogReport, }, nil } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 0a63f14e53..6b482d2990 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -516,6 +516,193 @@ func TestCreateLogs(t *testing.T) { } } +func TestCreateLogsWithLogReport(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + slice []event + logReport bool + expectedLogRecords int + expectedType string + expectedTimestamp string + expectedBody string + expectError bool + }{ + { + desc: "platform.report with logReport enabled - valid metrics", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124, + "memorySizeMB": 512, + "maxMemoryUsedMB": 256, + }, + }, + }, + }, + logReport: true, + expectedLogRecords: 1, + expectedType: "platform.report", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB", + expectError: false, + }, + { + desc: "platform.report with logReport disabled", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124, + "memorySizeMB": 512, + "maxMemoryUsedMB": 256, + }, + }, + }, + }, + logReport: false, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - missing requestId", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124, + "memorySizeMB": 512, + "maxMemoryUsedMB": 256, + }, + }, + }, + }, + logReport: false, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - invalid timestamp", + slice: []event{ + { + Time: "invalid-timestamp", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124, + "memorySizeMB": 512, + "maxMemoryUsedMB": 256, + }, + }, + }, + }, + logReport: false, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - missing metrics", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + }, + }, + }, + logReport: false, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - invalid metrics format", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": "invalid", + "billedDurationMs": 124, + "memorySizeMB": 512, + "maxMemoryUsedMB": 256, + }, + }, + }, + }, + logReport: false, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - record not a map", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: "invalid record format", + }, + }, + logReport: true, + expectedLogRecords: 0, + expectError: false, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + r, err := newTelemetryAPIReceiver( + &Config{LogReport: tc.logReport}, + receivertest.NewNopSettings(Type), + ) + require.NoError(t, err) + log, err := r.createLogs(tc.slice) + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, 1, log.ResourceLogs().Len()) + resourceLog := log.ResourceLogs().At(0) + require.Equal(t, 1, resourceLog.ScopeLogs().Len()) + scopeLog := resourceLog.ScopeLogs().At(0) + require.Equal(t, scopeName, scopeLog.Scope().Name()) + require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len()) + if scopeLog.LogRecords().Len() > 0 { + logRecord := scopeLog.LogRecords().At(0) + attr, ok := logRecord.Attributes().Get("type") + require.True(t, ok) + require.Equal(t, tc.expectedType, attr.Str()) + if tc.expectedTimestamp != "" { + expectedTime, err := time.Parse(time.RFC3339, tc.expectedTimestamp) + require.NoError(t, err) + require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) + } else { + // For invalid timestamps, no timestamp should be set (zero value) + require.Equal(t, pcommon.Timestamp(0), logRecord.Timestamp()) + } + require.Equal(t, tc.expectedBody, logRecord.Body().Str()) + } + } + }) + } +} + func TestSeverityTextToNumber(t *testing.T) { t.Parallel() From 4dd17cd0e0b36d9c97b482ca27b3c4d22e3e5c6f Mon Sep 17 00:00:00 2001 From: adamantal Date: Wed, 29 Oct 2025 15:23:02 +0100 Subject: [PATCH 2/4] add init duration --- .../receiver/telemetryapireceiver/receiver.go | 43 +++++++++----- .../telemetryapireceiver/receiver_test.go | 56 ++++++++++++++++++- 2 files changed, 81 insertions(+), 18 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index f782fbafe5..146f8e9007 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -45,11 +45,12 @@ const ( initialQueueSize = 5 scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" - logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %d ms Memory Size: %d MB Max Memory Used: %d MB" + logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" metricBilledDurationMs = "billedDurationMs" metricDurationMs = "durationMs" metricMaxMemoryUsedMB = "maxMemoryUsedMB" metricMemorySizeMB = "memorySizeMB" + metricInitDurationMs = "initDurationMs" ) type telemetryAPIReceiver struct { @@ -277,21 +278,28 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface if !ok { return nil } - var durationMs float64 - var billedDurationMs, memorySizeMB, maxMemoryUsedMB int + var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64 if durationMs, ok = metrics[metricDurationMs].(float64); !ok { return nil } - if billedDurationMs, ok = metrics[metricBilledDurationMs].(int); !ok { + if billedDurationMs, ok = metrics[metricBilledDurationMs].(float64); !ok { return nil } - if memorySizeMB, ok = metrics[metricMemorySizeMB].(int); !ok { + if memorySizeMB, ok = metrics[metricMemorySizeMB].(float64); !ok { return nil } - if maxMemoryUsedMB, ok = metrics[metricMaxMemoryUsedMB].(int); !ok { + if maxMemoryUsedMB, ok = metrics[metricMaxMemoryUsedMB].(float64); !ok { return nil } + // optionally gather information about cold start time + var initDurationMs float64 + if initDurationMsVal, exists := metrics[metricInitDurationMs]; exists { + if val, ok := initDurationMsVal.(float64); ok { + initDurationMs = val + } + } + // gathering requestId requestId := "" if requestId, ok = record["requestId"].(string); !ok { @@ -300,16 +308,21 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface // we have all information available, we can create the log record logRecord := scopeLog.LogRecords().AppendEmpty() - logRecord.Body().SetStr( - fmt.Sprintf( - logReportFmt, - requestId, - durationMs, - billedDurationMs, - memorySizeMB, - maxMemoryUsedMB, - ), + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + + // building the body of the log record, optionally adding the init duration + body := fmt.Sprintf( + logReportFmt, + requestId, + durationMs, + billedDurationMs, + memorySizeMB, + maxMemoryUsedMB, ) + if initDurationMs > 0 { + body += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs) + } + logRecord.Body().SetStr(body) return &logRecord } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 6b482d2990..888ece8153 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -539,9 +539,9 @@ func TestCreateLogsWithLogReport(t *testing.T) { "requestId": "test-request-id-123", "metrics": map[string]any{ "durationMs": 123.45, - "billedDurationMs": 124, - "memorySizeMB": 512, - "maxMemoryUsedMB": 256, + "billedDurationMs": float64(124), + "memorySizeMB": float64(512), + "maxMemoryUsedMB": float64(256), }, }, }, @@ -664,6 +664,56 @@ func TestCreateLogsWithLogReport(t *testing.T) { expectedLogRecords: 0, expectError: false, }, + { + desc: "platform.report with logReport enabled - with initDurationMs", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124.0, + "memorySizeMB": 512.0, + "maxMemoryUsedMB": 256.0, + "initDurationMs": 50.5, + }, + }, + }, + }, + logReport: true, + expectedLogRecords: 1, + expectedType: "platform.report", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB Init Duration: 50.50 ms", + expectError: false, + }, + { + desc: "platform.report with logReport enabled - with invalid initDurationMs type", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124.0, + "memorySizeMB": 512.0, + "maxMemoryUsedMB": 256.0, + "initDurationMs": "invalid-string", + }, + }, + }, + }, + logReport: true, + expectedLogRecords: 1, + expectedType: "platform.report", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB", + expectError: false, + }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { From 568bd9323e2549bb5ced8816388cb2a38758a844 Mon Sep 17 00:00:00 2001 From: adamantal Date: Wed, 29 Oct 2025 15:30:55 +0100 Subject: [PATCH 3/4] use types --- collector/internal/telemetryapi/types.go | 12 ++++++++++++ .../receiver/telemetryapireceiver/receiver.go | 16 +++++----------- .../telemetryapireceiver/receiver_test.go | 1 + 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/collector/internal/telemetryapi/types.go b/collector/internal/telemetryapi/types.go index 5fc8d7cd42..1cd4583c9b 100644 --- a/collector/internal/telemetryapi/types.go +++ b/collector/internal/telemetryapi/types.go @@ -97,3 +97,15 @@ type Event struct { Type string `json:"type"` Record map[string]any `json:"record"` } + +// MetricType represents the type of metric in the platform.report event +// see https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#ReportMetrics +type MetricType string + +const ( + MetricBilledDurationMs MetricType = "billedDurationMs" + MetricDurationMs MetricType = "durationMs" + MetricMaxMemoryUsedMB MetricType = "maxMemoryUsedMB" + MetricMemorySizeMB MetricType = "memorySizeMB" + MetricInitDurationMs MetricType = "initDurationMs" +) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 146f8e9007..0535383282 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -44,13 +44,7 @@ import ( const ( initialQueueSize = 5 scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" - - logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" - metricBilledDurationMs = "billedDurationMs" - metricDurationMs = "durationMs" - metricMaxMemoryUsedMB = "maxMemoryUsedMB" - metricMemorySizeMB = "memorySizeMB" - metricInitDurationMs = "initDurationMs" + logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" ) type telemetryAPIReceiver struct { @@ -279,16 +273,16 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface return nil } var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64 - if durationMs, ok = metrics[metricDurationMs].(float64); !ok { + if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok { return nil } - if billedDurationMs, ok = metrics[metricBilledDurationMs].(float64); !ok { + if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok { return nil } - if memorySizeMB, ok = metrics[metricMemorySizeMB].(float64); !ok { + if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok { return nil } - if maxMemoryUsedMB, ok = metrics[metricMaxMemoryUsedMB].(float64); !ok { + if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok { return nil } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 888ece8153..346ec2fc2b 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -527,6 +527,7 @@ func TestCreateLogsWithLogReport(t *testing.T) { expectedType string expectedTimestamp string expectedBody string + expectedAttributes map[string]interface{} expectError bool }{ { From 15dfdcf06ba6c4dccf3e79b67294c5bf8547e4a8 Mon Sep 17 00:00:00 2001 From: adamantal Date: Wed, 29 Oct 2025 16:04:00 +0100 Subject: [PATCH 4/4] fix tests --- collector/receiver/telemetryapireceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 0535383282..773a335b20 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -288,7 +288,7 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface // optionally gather information about cold start time var initDurationMs float64 - if initDurationMsVal, exists := metrics[metricInitDurationMs]; exists { + if initDurationMsVal, exists := metrics[string(telemetryapi.MetricInitDurationMs)]; exists { if val, ok := initDurationMsVal.(float64); ok { initDurationMs = val }