From 8fcef7777d638e1c7140cb95189860c252aab17f Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Thu, 2 Jan 2025 12:28:57 +0530 Subject: [PATCH] feat: Improve tracing by adding attributes --- .../cloud/spanner/AbstractReadContext.java | 1 + .../cloud/spanner/DatabaseClientImpl.java | 40 +++++++++++-------- .../google/cloud/spanner/SessionClient.java | 13 ++++-- .../com/google/cloud/spanner/SpannerImpl.java | 9 +++-- .../google/cloud/spanner/TraceWrapper.java | 32 ++++++++++++--- .../IntegrationTestWithClosedSessionsEnv.java | 4 +- 6 files changed, 70 insertions(+), 29 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index cecf462bd25..abcaa41e32c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -960,6 +960,7 @@ ResultSet readInternalWithOptions( SpannerImpl.READ, span, tracer, + tracer.createTableAttributes(table, readOptions), session.getErrorHandler(), rpc.getReadRetrySettings(), rpc.getReadRetryableCodes()) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index f571354dacb..7957838e408 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -26,6 +26,7 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.ListenableFuture; import com.google.spanner.v1.BatchWriteResponse; +import io.opentelemetry.api.common.Attributes; import javax.annotation.Nullable; class DatabaseClientImpl implements DatabaseClient { @@ -33,6 +34,7 @@ class DatabaseClientImpl implements DatabaseClient { private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction"; private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction"; private final TraceWrapper tracer; + private Attributes commonAttributes; @VisibleForTesting final String clientId; @VisibleForTesting final SessionPool pool; @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; @@ -50,7 +52,8 @@ class DatabaseClientImpl implements DatabaseClient { /* multiplexedSessionDatabaseClient = */ null, /* useMultiplexedSessionPartitionedOps= */ false, tracer, - /* useMultiplexedSessionForRW = */ false); + /* useMultiplexedSessionForRW = */ false, + Attributes.empty()); } @VisibleForTesting @@ -62,7 +65,8 @@ class DatabaseClientImpl implements DatabaseClient { /* multiplexedSessionDatabaseClient = */ null, /* useMultiplexedSessionPartitionedOps= */ false, tracer, - /* useMultiplexedSessionForRW = */ false); + /* useMultiplexedSessionForRW = */ false, + Attributes.empty()); } DatabaseClientImpl( @@ -72,7 +76,8 @@ class DatabaseClientImpl implements DatabaseClient { @Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient, boolean useMultiplexedSessionPartitionedOps, TraceWrapper tracer, - boolean useMultiplexedSessionForRW) { + boolean useMultiplexedSessionForRW, + Attributes commonAttributes) { this.clientId = clientId; this.pool = pool; this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite; @@ -80,6 +85,7 @@ class DatabaseClientImpl implements DatabaseClient { this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps; this.tracer = tracer; this.useMultiplexedSessionForRW = useMultiplexedSessionForRW; + this.commonAttributes = commonAttributes; } @VisibleForTesting @@ -138,7 +144,7 @@ public Timestamp write(final Iterable mutations) throws SpannerExcepti public CommitResponse writeWithOptions( final Iterable mutations, final TransactionOption... options) throws SpannerException { - ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options); try (IScope s = tracer.withSpan(span)) { if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) { return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options); @@ -161,7 +167,7 @@ public Timestamp writeAtLeastOnce(final Iterable mutations) throws Spa public CommitResponse writeAtLeastOnceWithOptions( final Iterable mutations, final TransactionOption... options) throws SpannerException { - ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options); try (IScope s = tracer.withSpan(span)) { if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) { return getMultiplexedSessionDatabaseClient() @@ -181,7 +187,7 @@ public CommitResponse writeAtLeastOnceWithOptions( public ServerStream batchWriteAtLeastOnce( final Iterable mutationGroups, final TransactionOption... options) throws SpannerException { - ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options); try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options)); } catch (RuntimeException e) { @@ -194,7 +200,7 @@ public ServerStream batchWriteAtLeastOnce( @Override public ReadContext singleUse() { - ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes); try (IScope s = tracer.withSpan(span)) { return getMultiplexedSession().singleUse(); } catch (RuntimeException e) { @@ -206,7 +212,7 @@ public ReadContext singleUse() { @Override public ReadContext singleUse(TimestampBound bound) { - ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes); try (IScope s = tracer.withSpan(span)) { return getMultiplexedSession().singleUse(bound); } catch (RuntimeException e) { @@ -218,7 +224,7 @@ public ReadContext singleUse(TimestampBound bound) { @Override public ReadOnlyTransaction singleUseReadOnlyTransaction() { - ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes); try (IScope s = tracer.withSpan(span)) { return getMultiplexedSession().singleUseReadOnlyTransaction(); } catch (RuntimeException e) { @@ -230,7 +236,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() { @Override public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { - ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes); try (IScope s = tracer.withSpan(span)) { return getMultiplexedSession().singleUseReadOnlyTransaction(bound); } catch (RuntimeException e) { @@ -242,7 +248,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { @Override public ReadOnlyTransaction readOnlyTransaction() { - ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes); try (IScope s = tracer.withSpan(span)) { return getMultiplexedSession().readOnlyTransaction(); } catch (RuntimeException e) { @@ -254,7 +260,7 @@ public ReadOnlyTransaction readOnlyTransaction() { @Override public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { - ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION); + ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes); try (IScope s = tracer.withSpan(span)) { return getMultiplexedSession().readOnlyTransaction(bound); } catch (RuntimeException e) { @@ -266,7 +272,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { @Override public TransactionRunner readWriteTransaction(TransactionOption... options) { - ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options); try (IScope s = tracer.withSpan(span)) { return getMultiplexedSessionForRW().readWriteTransaction(options); } catch (RuntimeException e) { @@ -278,7 +284,7 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) { @Override public TransactionManager transactionManager(TransactionOption... options) { - ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options); try (IScope s = tracer.withSpan(span)) { return getMultiplexedSessionForRW().transactionManager(options); } catch (RuntimeException e) { @@ -290,7 +296,7 @@ public TransactionManager transactionManager(TransactionOption... options) { @Override public AsyncRunner runAsync(TransactionOption... options) { - ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options); try (IScope s = tracer.withSpan(span)) { return getMultiplexedSessionForRW().runAsync(options); } catch (RuntimeException e) { @@ -302,7 +308,7 @@ public AsyncRunner runAsync(TransactionOption... options) { @Override public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { - ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options); + ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options); try (IScope s = tracer.withSpan(span)) { return getMultiplexedSessionForRW().transactionManagerAsync(options); } catch (RuntimeException e) { @@ -322,7 +328,7 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption... private long executePartitionedUpdateWithPooledSession( final Statement stmt, final UpdateOption... options) { - ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); + ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes); try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); } catch (RuntimeException e) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index a3cbbf33826..2edfb66d896 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import io.opentelemetry.api.common.Attributes; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -125,7 +126,8 @@ private BatchCreateSessionsRunnable( public void run() { List sessions; int remainingSessionsToCreate = sessionCount; - ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS); + ISpan span = + spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS, commonAttributes); try (IScope s = spanner.getTracer().withSpan(span)) { spanner .getTracer() @@ -170,6 +172,7 @@ interface SessionConsumer { private final ExecutorFactory executorFactory; private final ScheduledExecutorService executor; private final DatabaseId db; + private final Attributes commonAttributes; @GuardedBy("this") private volatile long sessionChannelCounter; @@ -182,6 +185,7 @@ interface SessionConsumer { this.db = db; this.executorFactory = executorFactory; this.executor = executorFactory.get(); + this.commonAttributes = spanner.getTracer().createCommonAttributes(db); } @Override @@ -205,7 +209,7 @@ SessionImpl createSession() { synchronized (this) { options = optionMap(SessionOption.channelHint(sessionChannelCounter++)); } - ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION); + ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes); try (IScope s = spanner.getTracer().withSpan(span)) { com.google.spanner.v1.Session session = spanner @@ -250,7 +254,10 @@ void createMultiplexedSession(SessionConsumer consumer) { * GRPC channel. In case of an error during the gRPC calls, an exception will be thrown. */ SessionImpl createMultiplexedSession() { - ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION); + ISpan span = + spanner + .getTracer() + .spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.commonAttributes); try (IScope s = spanner.getTracer().withSpan(span)) { com.google.spanner.v1.Session session = spanner diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index ed815c77088..b3eec55f73d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -315,7 +315,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(), multiplexedSessionDatabaseClient, getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(), - useMultiplexedSessionForRW); + useMultiplexedSessionForRW, + this.tracer.createCommonAttributes(db)); dbClients.put(db, dbClient); return dbClient; } @@ -329,7 +330,8 @@ DatabaseClientImpl createDatabaseClient( boolean useMultiplexedSessionBlindWrite, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient, boolean useMultiplexedSessionPartitionedOps, - boolean useMultiplexedSessionForRW) { + boolean useMultiplexedSessionForRW, + Attributes commonAttributes) { return new DatabaseClientImpl( clientId, pool, @@ -337,7 +339,8 @@ DatabaseClientImpl createDatabaseClient( multiplexedSessionClient, useMultiplexedSessionPartitionedOps, tracer, - useMultiplexedSessionForRW); + useMultiplexedSessionForRW, + commonAttributes); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceWrapper.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceWrapper.java index 02638445ae2..b8d74de98df 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceWrapper.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceWrapper.java @@ -38,10 +38,14 @@ class TraceWrapper { AttributeKey.stringKey("transaction.tag"); private static final AttributeKey STATEMENT_TAG_KEY = AttributeKey.stringKey("statement.tag"); + private static final AttributeKey INSTANCE_NAME_KEY = + AttributeKey.stringKey("instance.name"); + private static final AttributeKey DB_NAME_KEY = AttributeKey.stringKey("db.name"); private static final AttributeKey DB_STATEMENT_KEY = AttributeKey.stringKey("db.statement"); private static final AttributeKey> DB_STATEMENT_ARRAY_KEY = AttributeKey.stringArrayKey("db.statement"); + private static final AttributeKey DB_TABLE_NAME_KEY = AttributeKey.stringKey("db.table"); private static final AttributeKey THREAD_NAME_KEY = AttributeKey.stringKey("thread.name"); private final Tracer openCensusTracer; @@ -61,8 +65,8 @@ ISpan spanBuilder(String spanName) { return spanBuilder(spanName, Attributes.empty()); } - ISpan spanBuilder(String spanName, TransactionOption... options) { - return spanBuilder(spanName, createTransactionAttributes(options)); + ISpan spanBuilder(String spanName, Attributes commonAttributes, TransactionOption... options) { + return spanBuilder(spanName, createTransactionAttributes(commonAttributes, options)); } ISpan spanBuilder(String spanName, Attributes attributes) { @@ -137,7 +141,9 @@ IScope withSpan(ISpan span) { } } - Attributes createTransactionAttributes(TransactionOption... options) { + Attributes createTransactionAttributes( + Attributes commonAttributes, TransactionOption... options) { + AttributesBuilder builder = commonAttributes.toBuilder(); if (options != null && options.length > 0) { Optional tagOption = Arrays.stream(options) @@ -145,10 +151,10 @@ Attributes createTransactionAttributes(TransactionOption... options) { .map(option -> (TagOption) option) .findAny(); if (tagOption.isPresent()) { - return Attributes.of(TRANSACTION_TAG_KEY, tagOption.get().getTag()); + builder.put(TRANSACTION_TAG_KEY, tagOption.get().getTag()); } } - return Attributes.empty(); + return builder.build(); } Attributes createStatementAttributes(Statement statement, Options options) { @@ -185,6 +191,22 @@ Attributes createStatementBatchAttributes(Iterable statements, Option return Attributes.empty(); } + Attributes createTableAttributes(String tableName, Options options) { + AttributesBuilder builder = Attributes.builder(); + builder.put(DB_TABLE_NAME_KEY, tableName); + if (options != null && options.hasTag()) { + builder.put(STATEMENT_TAG_KEY, options.tag()); + } + return builder.build(); + } + + Attributes createCommonAttributes(DatabaseId db) { + AttributesBuilder builder = Attributes.builder(); + builder.put(DB_NAME_KEY, db.getDatabase()); + builder.put(INSTANCE_NAME_KEY, db.getInstanceId().getName()); + return builder.build(); + } + private static String getTraceThreadName() { return MoreObjects.firstNonNull( Context.current().get(OpenTelemetryContextKeys.THREAD_NAME_KEY), diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java index 72cfe0bfe44..f852fc2903f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java @@ -20,6 +20,7 @@ import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SessionPool.SessionFutureWrapper; import com.google.cloud.spanner.testing.RemoteSpannerHelper; +import io.opentelemetry.api.common.Attributes; /** * Subclass of {@link IntegrationTestEnv} that allows the user to specify when the underlying @@ -52,7 +53,8 @@ DatabaseClientImpl createDatabaseClient( boolean useMultiplexedSessionBlindWriteIgnore, MultiplexedSessionDatabaseClient ignore, boolean useMultiplexedSessionPartitionedOpsIgnore, - boolean useMultiplexedSessionForRWIgnore) { + boolean useMultiplexedSessionForRWIgnore, + Attributes attributes) { return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer); } }