Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions collector/internal/telemetryapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
PlatformInitStart EventType = Platform + ".initStart"
// PlatformInitRuntimeDone is used when function initialization ended.
PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone"
// PlatformReport is used when a report of function invocation is received.
PlatformReport EventType = Platform + ".report"
// Function invocation started.
PlatformStart EventType = Platform + ".start"
// The runtime finished processing an event with either success or failure.
Expand Down Expand Up @@ -95,3 +97,15 @@ type Event struct {
Type string `json:"type"`
Record map[string]any `json:"record"`
}

// MetricType represents the type of metric in the platform.report event
// see https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#ReportMetrics
type MetricType string

const (
MetricBilledDurationMs MetricType = "billedDurationMs"
MetricDurationMs MetricType = "durationMs"
MetricMaxMemoryUsedMB MetricType = "maxMemoryUsedMB"
MetricMemorySizeMB MetricType = "memorySizeMB"
MetricInitDurationMs MetricType = "initDurationMs"
)
1 change: 1 addition & 0 deletions collector/receiver/telemetryapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
extensionID string
Port int `mapstructure:"port"`
Types []string `mapstructure:"types"`
LogReport bool `mapstructure:"log_report"`
}

// Validate validates the configuration by checking for missing or invalid fields
Expand Down
78 changes: 76 additions & 2 deletions collector/receiver/telemetryapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ import (
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
)

const initialQueueSize = 5
const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
const (
initialQueueSize = 5
scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB"
)

type telemetryAPIReceiver struct {
httpServer *http.Server
Expand All @@ -57,6 +60,7 @@ type telemetryAPIReceiver struct {
types []telemetryapi.EventType
resource pcommon.Resource
currentFaasInvocationID string
logReport bool
}

func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error {
Expand Down Expand Up @@ -242,12 +246,81 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
}
} else if el.Type == string(telemetryapi.PlatformRuntimeDone) {
r.currentFaasInvocationID = ""
} else if el.Type == string(telemetryapi.PlatformReport) && r.logReport {
if record, ok := el.Record.(map[string]interface{}); ok {
if logRecord := createReportLogRecord(&scopeLog, record); logRecord != nil {
logRecord.Attributes().PutStr("type", el.Type)
if t, err := time.Parse(time.RFC3339, el.Time); err == nil {
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
} else {
continue
}
}
}
}
}
}
return log, nil
}

// createReportLogRecord creates a log record for the platform.report event
// returns the log record if successful, otherwise nil
func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface{}) *plog.LogRecord {
// gathering metrics
metrics, ok := record["metrics"].(map[string]interface{})
if !ok {
return nil
}
var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64
if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok {
return nil
}
if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok {
return nil
}
if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok {
return nil
}
if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok {
return nil
}

// optionally gather information about cold start time
var initDurationMs float64
if initDurationMsVal, exists := metrics[string(telemetryapi.MetricInitDurationMs)]; exists {
if val, ok := initDurationMsVal.(float64); ok {
initDurationMs = val
}
}

// gathering requestId
requestId := ""
if requestId, ok = record["requestId"].(string); !ok {
return nil
}

// we have all information available, we can create the log record
logRecord := scopeLog.LogRecords().AppendEmpty()
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId)

// building the body of the log record, optionally adding the init duration
body := fmt.Sprintf(
logReportFmt,
requestId,
durationMs,
billedDurationMs,
memorySizeMB,
maxMemoryUsedMB,
)
if initDurationMs > 0 {
body += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs)
}
logRecord.Body().SetStr(body)

return &logRecord
}

func severityTextToNumber(severityText string) plog.SeverityNumber {
mapping := map[string]plog.SeverityNumber{
"TRACE": plog.SeverityNumberTrace,
Expand Down Expand Up @@ -366,6 +439,7 @@ func newTelemetryAPIReceiver(
port: cfg.Port,
types: subscribedTypes,
resource: r,
logReport: cfg.LogReport,
}, nil
}

Expand Down
238 changes: 238 additions & 0 deletions collector/receiver/telemetryapireceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,244 @@ func TestCreateLogs(t *testing.T) {
}
}

func TestCreateLogsWithLogReport(t *testing.T) {
t.Parallel()

testCases := []struct {
desc string
slice []event
logReport bool
expectedLogRecords int
expectedType string
expectedTimestamp string
expectedBody string
expectedAttributes map[string]interface{}
expectError bool
}{
{
desc: "platform.report with logReport enabled - valid metrics",
slice: []event{
{
Time: "2022-10-12T00:03:50.000Z",
Type: "platform.report",
Record: map[string]any{
"requestId": "test-request-id-123",
"metrics": map[string]any{
"durationMs": 123.45,
"billedDurationMs": float64(124),
"memorySizeMB": float64(512),
"maxMemoryUsedMB": float64(256),
},
},
},
},
logReport: true,
expectedLogRecords: 1,
expectedType: "platform.report",
expectedTimestamp: "2022-10-12T00:03:50.000Z",
expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB",
expectError: false,
},
{
desc: "platform.report with logReport disabled",
slice: []event{
{
Time: "2022-10-12T00:03:50.000Z",
Type: "platform.report",
Record: map[string]any{
"requestId": "test-request-id-123",
"metrics": map[string]any{
"durationMs": 123.45,
"billedDurationMs": 124,
"memorySizeMB": 512,
"maxMemoryUsedMB": 256,
},
},
},
},
logReport: false,
expectedLogRecords: 0,
expectError: false,
},
{
desc: "platform.report with logReport enabled - missing requestId",
slice: []event{
{
Time: "2022-10-12T00:03:50.000Z",
Type: "platform.report",
Record: map[string]any{
"metrics": map[string]any{
"durationMs": 123.45,
"billedDurationMs": 124,
"memorySizeMB": 512,
"maxMemoryUsedMB": 256,
},
},
},
},
logReport: false,
expectedLogRecords: 0,
expectError: false,
},
{
desc: "platform.report with logReport enabled - invalid timestamp",
slice: []event{
{
Time: "invalid-timestamp",
Type: "platform.report",
Record: map[string]any{
"requestId": "test-request-id-123",
"metrics": map[string]any{
"durationMs": 123.45,
"billedDurationMs": 124,
"memorySizeMB": 512,
"maxMemoryUsedMB": 256,
},
},
},
},
logReport: false,
expectedLogRecords: 0,
expectError: false,
},
{
desc: "platform.report with logReport enabled - missing metrics",
slice: []event{
{
Time: "2022-10-12T00:03:50.000Z",
Type: "platform.report",
Record: map[string]any{
"requestId": "test-request-id-123",
},
},
},
logReport: false,
expectedLogRecords: 0,
expectError: false,
},
{
desc: "platform.report with logReport enabled - invalid metrics format",
slice: []event{
{
Time: "2022-10-12T00:03:50.000Z",
Type: "platform.report",
Record: map[string]any{
"requestId": "test-request-id-123",
"metrics": map[string]any{
"durationMs": "invalid",
"billedDurationMs": 124,
"memorySizeMB": 512,
"maxMemoryUsedMB": 256,
},
},
},
},
logReport: false,
expectedLogRecords: 0,
expectError: false,
},
{
desc: "platform.report with logReport enabled - record not a map",
slice: []event{
{
Time: "2022-10-12T00:03:50.000Z",
Type: "platform.report",
Record: "invalid record format",
},
},
logReport: true,
expectedLogRecords: 0,
expectError: false,
},
{
desc: "platform.report with logReport enabled - with initDurationMs",
slice: []event{
{
Time: "2022-10-12T00:03:50.000Z",
Type: "platform.report",
Record: map[string]any{
"requestId": "test-request-id-123",
"metrics": map[string]any{
"durationMs": 123.45,
"billedDurationMs": 124.0,
"memorySizeMB": 512.0,
"maxMemoryUsedMB": 256.0,
"initDurationMs": 50.5,
},
},
},
},
logReport: true,
expectedLogRecords: 1,
expectedType: "platform.report",
expectedTimestamp: "2022-10-12T00:03:50.000Z",
expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB Init Duration: 50.50 ms",
expectError: false,
},
{
desc: "platform.report with logReport enabled - with invalid initDurationMs type",
slice: []event{
{
Time: "2022-10-12T00:03:50.000Z",
Type: "platform.report",
Record: map[string]any{
"requestId": "test-request-id-123",
"metrics": map[string]any{
"durationMs": 123.45,
"billedDurationMs": 124.0,
"memorySizeMB": 512.0,
"maxMemoryUsedMB": 256.0,
"initDurationMs": "invalid-string",
},
},
},
},
logReport: true,
expectedLogRecords: 1,
expectedType: "platform.report",
expectedTimestamp: "2022-10-12T00:03:50.000Z",
expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB",
expectError: false,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
r, err := newTelemetryAPIReceiver(
&Config{LogReport: tc.logReport},
receivertest.NewNopSettings(Type),
)
require.NoError(t, err)
log, err := r.createLogs(tc.slice)
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, 1, log.ResourceLogs().Len())
resourceLog := log.ResourceLogs().At(0)
require.Equal(t, 1, resourceLog.ScopeLogs().Len())
scopeLog := resourceLog.ScopeLogs().At(0)
require.Equal(t, scopeName, scopeLog.Scope().Name())
require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len())
if scopeLog.LogRecords().Len() > 0 {
logRecord := scopeLog.LogRecords().At(0)
attr, ok := logRecord.Attributes().Get("type")
require.True(t, ok)
require.Equal(t, tc.expectedType, attr.Str())
if tc.expectedTimestamp != "" {
expectedTime, err := time.Parse(time.RFC3339, tc.expectedTimestamp)
require.NoError(t, err)
require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp())
} else {
// For invalid timestamps, no timestamp should be set (zero value)
require.Equal(t, pcommon.Timestamp(0), logRecord.Timestamp())
}
require.Equal(t, tc.expectedBody, logRecord.Body().Str())
}
}
})
}
}

func TestSeverityTextToNumber(t *testing.T) {
t.Parallel()

Expand Down
Loading