From 2cabaa0b86e0e9d4a027101a3077e0cb7348ecb8 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 17 Jul 2024 10:21:15 +0000 Subject: [PATCH 1/3] feat(spanner): set manual affinity incase of gRPC-GCP extenstion --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index b6016f04f78..9a3f0e4bbf7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -56,8 +56,10 @@ import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; +import com.google.cloud.grpc.GcpManagedChannel; import com.google.cloud.grpc.GcpManagedChannelBuilder; import com.google.cloud.grpc.GcpManagedChannelOptions; +import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException; @@ -529,6 +531,10 @@ private static String parseGrpcGcpApiConfig() { private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions options) { GcpManagedChannelOptions grpcGcpOptions = MoreObjects.firstNonNull(options.getGrpcGcpOptions(), new GcpManagedChannelOptions()); + GcpChannelPoolOptions gcpChanelPoolOptions = + GcpChannelPoolOptions.newBuilder() + .setAffinityKeyLifetime(java.time.Duration.ofMinutes(60L)) + .build(); GcpMetricsOptions metricsOptions = MoreObjects.firstNonNull( grpcGcpOptions.getMetricsOptions(), GcpMetricsOptions.newBuilder().build()); @@ -542,6 +548,7 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions } return GcpManagedChannelOptions.newBuilder(grpcGcpOptions) .withMetricsOptions(metricsOptionsBuilder.build()) + .withChannelPoolOptions(gcpChanelPoolOptions) .build(); } @@ -1950,7 +1957,17 @@ GrpcCallContext newCallContext( boolean routeToLeader) { GrpcCallContext context = GrpcCallContext.createDefault(); if (options != null) { + // Set channel affinity in GAX. This is a no-op for gRPC-GCP. context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); + + // Set channel affinity in gRPC-GCP. This is a no-op for GAX. + context = + context.withCallOptions( + context + .getCallOptions() + .withOption( + GcpManagedChannel.AFFINITY_KEY, + Option.CHANNEL_HINT.getLong(options).toString())); } if (compressorName != null) { // This sets the compressor for Client -> Server. From e86fd8f19bd58d9f550871a921891a036d7a95e8 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 18 Jul 2024 12:09:11 +0000 Subject: [PATCH 2/3] feat(spanner): convert channel hint to be bounded --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 9a3f0e4bbf7..17a14d3bc31 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -59,7 +59,6 @@ import com.google.cloud.grpc.GcpManagedChannel; import com.google.cloud.grpc.GcpManagedChannelBuilder; import com.google.cloud.grpc.GcpManagedChannelOptions; -import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException; @@ -268,6 +267,7 @@ public class GapicSpannerRpc implements SpannerRpc { private static final ConcurrentMap ADMINISTRATIVE_REQUESTS_RATE_LIMITERS = new ConcurrentHashMap<>(); private final boolean leaderAwareRoutingEnabled; + private final int numChannels; public static GapicSpannerRpc create(SpannerOptions options) { return new GapicSpannerRpc(options); @@ -319,6 +319,7 @@ public GapicSpannerRpc(final SpannerOptions options) { this.callCredentialsProvider = options.getCallCredentialsProvider(); this.compressorName = options.getCompressorName(); this.leaderAwareRoutingEnabled = options.isLeaderAwareRoutingEnabled(); + this.numChannels = options.getNumChannels(); if (initializeStubs) { // First check if SpannerOptions provides a TransportChannelProvider. Create one @@ -531,10 +532,6 @@ private static String parseGrpcGcpApiConfig() { private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions options) { GcpManagedChannelOptions grpcGcpOptions = MoreObjects.firstNonNull(options.getGrpcGcpOptions(), new GcpManagedChannelOptions()); - GcpChannelPoolOptions gcpChanelPoolOptions = - GcpChannelPoolOptions.newBuilder() - .setAffinityKeyLifetime(java.time.Duration.ofMinutes(60L)) - .build(); GcpMetricsOptions metricsOptions = MoreObjects.firstNonNull( grpcGcpOptions.getMetricsOptions(), GcpMetricsOptions.newBuilder().build()); @@ -548,7 +545,6 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions } return GcpManagedChannelOptions.newBuilder(grpcGcpOptions) .withMetricsOptions(metricsOptionsBuilder.build()) - .withChannelPoolOptions(gcpChanelPoolOptions) .build(); } @@ -1961,13 +1957,13 @@ GrpcCallContext newCallContext( context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); // Set channel affinity in gRPC-GCP. This is a no-op for GAX. + // Compute bounded channel hint to prevent gRPC-GCP affinity map from getting unbounded. + int boundedChannelHint = Option.CHANNEL_HINT.getLong(options).intValue() % this.numChannels; context = context.withCallOptions( context .getCallOptions() - .withOption( - GcpManagedChannel.AFFINITY_KEY, - Option.CHANNEL_HINT.getLong(options).toString())); + .withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(boundedChannelHint))); } if (compressorName != null) { // This sets the compressor for Client -> Server. From cddc2d3bd879011f3227f19e70aa5b07797a32e1 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 18 Jul 2024 16:07:30 +0000 Subject: [PATCH 3/3] feat(spanner): check if extension is enabled --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 17a14d3bc31..31804cf2808 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -268,6 +268,7 @@ public class GapicSpannerRpc implements SpannerRpc { new ConcurrentHashMap<>(); private final boolean leaderAwareRoutingEnabled; private final int numChannels; + private final boolean isGrpcGcpExtensionEnabled; public static GapicSpannerRpc create(SpannerOptions options) { return new GapicSpannerRpc(options); @@ -320,6 +321,7 @@ public GapicSpannerRpc(final SpannerOptions options) { this.compressorName = options.getCompressorName(); this.leaderAwareRoutingEnabled = options.isLeaderAwareRoutingEnabled(); this.numChannels = options.getNumChannels(); + this.isGrpcGcpExtensionEnabled = options.isGrpcGcpExtensionEnabled(); if (initializeStubs) { // First check if SpannerOptions provides a TransportChannelProvider. Create one @@ -1953,17 +1955,20 @@ GrpcCallContext newCallContext( boolean routeToLeader) { GrpcCallContext context = GrpcCallContext.createDefault(); if (options != null) { - // Set channel affinity in GAX. This is a no-op for gRPC-GCP. - context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); - - // Set channel affinity in gRPC-GCP. This is a no-op for GAX. - // Compute bounded channel hint to prevent gRPC-GCP affinity map from getting unbounded. - int boundedChannelHint = Option.CHANNEL_HINT.getLong(options).intValue() % this.numChannels; - context = - context.withCallOptions( - context - .getCallOptions() - .withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(boundedChannelHint))); + if (this.isGrpcGcpExtensionEnabled) { + // Set channel affinity in gRPC-GCP. + // Compute bounded channel hint to prevent gRPC-GCP affinity map from getting unbounded. + int boundedChannelHint = Option.CHANNEL_HINT.getLong(options).intValue() % this.numChannels; + context = + context.withCallOptions( + context + .getCallOptions() + .withOption( + GcpManagedChannel.AFFINITY_KEY, String.valueOf(boundedChannelHint))); + } else { + // Set channel affinity in GAX. + context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); + } } if (compressorName != null) { // This sets the compressor for Client -> Server.