Skip to content

feat: add a span around dialing connections #830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions rueidisotel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/redis/rueidis"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -58,6 +59,11 @@ type dialMetrics struct {
recordOpts []metric.RecordOption
}

type dialTracer struct {
trace.Tracer
tAttrs trace.SpanStartEventOption
}

// WithHistogramOption sets the HistogramOption.
// If not set, DefaultHistogramBuckets will be used.
func WithHistogramOption(histogramOption HistogramOption) Option {
Expand Down Expand Up @@ -116,7 +122,7 @@ func NewClient(clientOption rueidis.ClientOption, opts ...Option) (rueidis.Clien
return nil, err
}

clientOption.DialCtxFn = trackDialing(metrics, clientOption.DialCtxFn)
clientOption.DialCtxFn = trackDialing(metrics, dialTracer{Tracer: oclient.tracer, tAttrs: oclient.tAttrs}, clientOption.DialCtxFn)

cli, err := rueidis.NewClient(clientOption)
if err != nil {
Expand Down Expand Up @@ -174,16 +180,22 @@ func newClient(opts ...Option) (*otelclient, error) {
return cli, nil
}

func trackDialing(m dialMetrics, dialFn func(context.Context, string, *net.Dialer, *tls.Config) (conn net.Conn, err error)) func(context.Context, string, *net.Dialer, *tls.Config) (conn net.Conn, err error) {
return func(ctx context.Context, network string, dialer *net.Dialer, tlsConfig *tls.Config) (conn net.Conn, err error) {
func trackDialing(m dialMetrics, t dialTracer, dialFn func(context.Context, string, *net.Dialer, *tls.Config) (conn net.Conn, err error)) func(context.Context, string, *net.Dialer, *tls.Config) (conn net.Conn, err error) {
return func(ctx context.Context, dst string, dialer *net.Dialer, tlsConfig *tls.Config) (conn net.Conn, err error) {
ctx, span := t.Start(ctx, "redis.dial", kind, trace.WithAttributes(dbattr, attribute.String("server.address", dst)), t.tAttrs)
defer span.End()

m.attempt.Add(ctx, 1, m.addOpts...)

start := time.Now()

conn, err = dialFn(ctx, network, dialer, tlsConfig)
conn, err = dialFn(ctx, dst, dialer, tlsConfig)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.SetStatus(codes.Ok, "")

// Use floating point division for higher precision (instead of Seconds method).
m.latency.Record(ctx, float64(time.Since(start))/float64(time.Second), m.recordOpts...)
Expand Down
91 changes: 69 additions & 22 deletions rueidisotel/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -338,12 +339,7 @@ func TestWithDBStatement(t *testing.T) {
}
}

func TestWithClientSimple(t *testing.T) {
client, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
t.Fatal(err)
}

func TestNewClientSimple(t *testing.T) {
exp := tracetest.NewInMemoryExporter()
tracerProvider := trace.NewTracerProvider(trace.WithSyncer(exp))

Expand All @@ -352,39 +348,43 @@ func TestWithClientSimple(t *testing.T) {

dbStmtFunc := func(cmdTokens []string) string { return strings.Join(cmdTokens, " ") }

client = WithClient(
client,
client, err := NewClient(
rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}},
TraceAttrs(attribute.String("any", "label")),
MetricAttrs(attribute.String("any", "label")),
WithTracerProvider(tracerProvider),
WithMeterProvider(meterProvider),
WithDBStatement(dbStmtFunc),
WithOperationMetricAttr(),
)
if err != nil {
t.Fatal(err)
}
defer client.Close()

cmd := client.B().Set().Key("key").Value("val").Build()
client.Do(context.Background(), cmd)

// Validate trace
spans := exp.GetSpans().Snapshots()
if len(spans) != 1 {
t.Fatalf("expected 1 span, got %d", len(spans))
if len(spans) < 2 {
t.Fatalf("expected at least 2 spans, got %d", len(spans))
}
span := spans[0]
if span.Name() != "SET" {
t.Fatalf("unexpected span name: got %s, expected %s", span.Name(), "Set")
}
var found bool
for _, attr := range span.Attributes() {
if string(attr.Key) == "any" && attr.Value.AsString() == "label" {
found = true
break
}

commandSpanIdx := slices.IndexFunc(spans, func(span trace.ReadOnlySpan) bool { return span.Name() == "SET" })
if commandSpanIdx == -1 {
t.Fatal("could not find SET span")
}
if !found {
t.Fatalf("expected attribute 'any: label' not found in span attributes")
commandSpan := spans[commandSpanIdx]
validateSpanHasAttribute(t, commandSpan, "any", "label")

dialSpanIdx := slices.IndexFunc(spans, func(span trace.ReadOnlySpan) bool { return span.Name() == "redis.dial" })
if dialSpanIdx == -1 {
t.Fatal("could not find dial span")
}
dialSpan := spans[dialSpanIdx]
validateSpanHasAttribute(t, dialSpan, "server.address", "127.0.0.1:6379")
validateSpanHasAttribute(t, dialSpan, "any", "label")

metrics := metricdata.ResourceMetrics{}
if err := mxp.Collect(context.Background(), &metrics); err != nil {
Expand All @@ -395,6 +395,53 @@ func TestWithClientSimple(t *testing.T) {
validateMetricHasAttributes(t, metrics, "rueidis_command_duration_seconds", "operation")
}

func TestNewClientErrorSpan(t *testing.T) {
exp := tracetest.NewInMemoryExporter()
tracerProvider := trace.NewTracerProvider(trace.WithSyncer(exp))

mxp := metric.NewManualReader()
meterProvider := metric.NewMeterProvider(metric.WithReader(mxp))

_, err := NewClient(
rueidis.ClientOption{InitAddress: []string{"256.256.256.256:6379"}},
TraceAttrs(attribute.String("any", "label")),
MetricAttrs(attribute.String("any", "label")),
WithTracerProvider(tracerProvider),
WithMeterProvider(meterProvider),
WithOperationMetricAttr(),
)
if err == nil {
t.Fatal("expected error")
}

spans := exp.GetSpans().Snapshots()
if len(spans) != 1 {
t.Fatalf("expected 1 span, got %d", len(spans))
}
span := spans[0]
if span.Name() != "redis.dial" {
t.Fatalf("expected span name 'redis.dial', got %s", span.Name())
}
validateSpanHasAttribute(t, span, "any", "label")
events := span.Events()
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
event := events[0]
if event.Name != "exception" {
t.Fatalf("expected event name 'exception', got %s", event.Name)
}
}

func validateSpanHasAttribute(t *testing.T, span trace.ReadOnlySpan, key, value string) {
t.Helper()
if !slices.ContainsFunc(span.Attributes(), func(attr attribute.KeyValue) bool {
return string(attr.Key) == key && attr.Value.AsString() == value
}) {
t.Fatalf("expected attribute '%s: %s' not found in span attributes", key, value)
}
}

func validateMetrics(t *testing.T, metrics metricdata.ResourceMetrics, name string, value int64) {
t.Helper()

Expand Down
Loading