diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java index 1b94702061c..5f735264e07 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java @@ -76,6 +76,7 @@ import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType; import com.google.cloud.spanner.connection.ConnectionProperty.Context; import com.google.cloud.spanner.connection.ConnectionState.Type; +import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType; import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout; import com.google.cloud.spanner.connection.StatementResult.ResultType; import com.google.cloud.spanner.connection.UnitOfWork.CallType; @@ -302,9 +303,17 @@ static UnitOfWorkType of(TransactionMode transactionMode) { Preconditions.checkNotNull(options); this.leakedException = options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null; + StatementExecutorType statementExecutorType; + if (options.getStatementExecutorType() != null) { + statementExecutorType = options.getStatementExecutorType(); + } else { + statementExecutorType = + options.isUseVirtualThreads() + ? StatementExecutorType.VIRTUAL_THREAD + : StatementExecutorType.DIRECT_EXECUTOR; + } this.statementExecutor = - new StatementExecutor( - options.isUseVirtualThreads(), options.getStatementExecutionInterceptors()); + new StatementExecutor(statementExecutorType, options.getStatementExecutionInterceptors()); this.spannerPool = SpannerPool.INSTANCE; this.options = options; this.spanner = spannerPool.getSpanner(options, this); @@ -348,7 +357,11 @@ && getDialect() == Dialect.POSTGRESQL this.leakedException = options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null; this.statementExecutor = - new StatementExecutor(options.isUseVirtualThreads(), Collections.emptyList()); + new StatementExecutor( + options.isUseVirtualThreads() + ? StatementExecutorType.VIRTUAL_THREAD + : StatementExecutorType.DIRECT_EXECUTOR, + Collections.emptyList()); this.spannerPool = Preconditions.checkNotNull(spannerPool); this.options = Preconditions.checkNotNull(options); this.spanner = spannerPool.getSpanner(options, this); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java index 66b09c35afd..2be2d7980b2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java @@ -70,6 +70,7 @@ import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -614,6 +615,7 @@ public static class Builder { new HashMap<>(); private String uri; private Credentials credentials; + private StatementExecutorType statementExecutorType; private SessionPoolOptions sessionPoolOptions; private List statementExecutionInterceptors = Collections.emptyList(); @@ -777,6 +779,11 @@ Builder setCredentials(Credentials credentials) { return this; } + Builder setStatementExecutorType(StatementExecutorType statementExecutorType) { + this.statementExecutorType = statementExecutorType; + return this; + } + public Builder setOpenTelemetry(OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; return this; @@ -814,6 +821,7 @@ public static Builder newBuilder() { private final String instanceId; private final String databaseName; private final Credentials credentials; + private final StatementExecutorType statementExecutorType; private final SessionPoolOptions sessionPoolOptions; private final OpenTelemetry openTelemetry; @@ -834,6 +842,7 @@ private ConnectionOptions(Builder builder) { ConnectionPropertyValue value = cast(connectionPropertyValues.get(LENIENT.getKey())); this.warnings = checkValidProperties(value != null && value.getValue(), uri); this.fixedCredentials = builder.credentials; + this.statementExecutorType = builder.statementExecutorType; this.openTelemetry = builder.openTelemetry; this.statementExecutionInterceptors = @@ -1105,6 +1114,10 @@ CredentialsProvider getCredentialsProvider() { return getInitialConnectionPropertyValue(CREDENTIALS_PROVIDER); } + StatementExecutorType getStatementExecutorType() { + return this.statementExecutorType; + } + /** The {@link SessionPoolOptions} of this {@link ConnectionOptions}. */ public SessionPoolOptions getSessionPoolOptions() { return sessionPoolOptions; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementExecutor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementExecutor.java index b92e575a047..7340834a926 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementExecutor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementExecutor.java @@ -146,7 +146,10 @@ java.time.Duration asDuration() { ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory("connection-executor", false); /** Creates an {@link ExecutorService} for a {@link StatementExecutor}. */ - private static ListeningExecutorService createExecutorService(boolean useVirtualThreads) { + private static ListeningExecutorService createExecutorService(StatementExecutorType type) { + if (type == StatementExecutorType.DIRECT_EXECUTOR) { + return MoreExecutors.newDirectExecutorService(); + } return MoreExecutors.listeningDecorator( Context.taskWrapping( new ThreadPoolExecutor( @@ -155,7 +158,7 @@ private static ListeningExecutorService createExecutorService(boolean useVirtual 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), - useVirtualThreads + type == StatementExecutorType.VIRTUAL_THREAD ? DEFAULT_VIRTUAL_THREAD_FACTORY : DEFAULT_DAEMON_THREAD_FACTORY))); } @@ -168,13 +171,23 @@ private static ListeningExecutorService createExecutorService(boolean useVirtual */ private final List interceptors; + enum StatementExecutorType { + PLATFORM_THREAD, + VIRTUAL_THREAD, + DIRECT_EXECUTOR, + } + @VisibleForTesting StatementExecutor() { - this(DEFAULT_USE_VIRTUAL_THREADS, Collections.emptyList()); + this( + DEFAULT_USE_VIRTUAL_THREADS + ? StatementExecutorType.VIRTUAL_THREAD + : StatementExecutorType.PLATFORM_THREAD, + Collections.emptyList()); } - StatementExecutor(boolean useVirtualThreads, List interceptors) { - this.executor = createExecutorService(useVirtualThreads); + StatementExecutor(StatementExecutorType type, List interceptors) { + this.executor = createExecutorService(type); this.interceptors = Collections.unmodifiableList(interceptors); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java index 375fdda2a95..fa3ab00b138 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java @@ -283,6 +283,7 @@ ITConnection createConnection( ConnectionOptions.newBuilder() .setUri(getBaseUrl() + additionalUrlOptions) .setStatementExecutionInterceptors(interceptors); + configureConnectionOptions(builder); ConnectionOptions options = builder.build(); ITConnection connection = createITConnection(options); for (TransactionRetryListener listener : transactionRetryListeners) { @@ -291,6 +292,11 @@ ITConnection createConnection( return connection; } + protected ConnectionOptions.Builder configureConnectionOptions( + ConnectionOptions.Builder builder) { + return builder; + } + protected String getBaseUrl() { return String.format( "cloudspanner://localhost:%d/projects/proj/instances/inst/databases/db?usePlainText=true;autocommit=false;retryAbortsInternally=true", diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiAbortedTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiAbortedTest.java index 97d745edd7f..1f4378ecd86 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiAbortedTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiAbortedTest.java @@ -32,7 +32,9 @@ import com.google.cloud.spanner.Options; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.connection.ConnectionOptions.Builder; import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection; +import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -127,6 +129,11 @@ ITConnection createConnection(TransactionRetryListener listener) { return connection; } + @Override + protected Builder configureConnectionOptions(Builder builder) { + return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD); + } + @Test public void testSingleQueryAborted() { RetryCounter counter = new RetryCounter(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiTest.java index 21fe086d4f8..c6777da5b51 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiTest.java @@ -36,7 +36,9 @@ import com.google.cloud.spanner.SpannerApiFutures; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.connection.ConnectionOptions.Builder; import com.google.cloud.spanner.connection.SpannerPool.CheckAndCloseSpannersMode; +import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType; import com.google.cloud.spanner.connection.StatementResult.ResultType; import com.google.common.base.Function; import com.google.common.collect.Collections2; @@ -86,6 +88,11 @@ public void setup() { } } + @Override + protected Builder configureConnectionOptions(Builder builder) { + return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD); + } + @After public void reset() { mockSpanner.removeAllExecutionTimes(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionTest.java index c5b34982255..9ae174bd403 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionTest.java @@ -36,6 +36,8 @@ import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.TimestampBound; +import com.google.cloud.spanner.connection.ConnectionOptions.Builder; +import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType; import com.google.common.collect.ImmutableList; import com.google.spanner.v1.BatchCreateSessionsRequest; import com.google.spanner.v1.CommitRequest; @@ -417,6 +419,11 @@ protected String getBaseUrl() { return super.getBaseUrl() + ";maxSessions=1"; } + @Override + protected Builder configureConnectionOptions(Builder builder) { + return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD); + } + @Test public void testMaxSessions() throws InterruptedException, TimeoutException, ExecutionException { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/StatementTimeoutTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/StatementTimeoutTest.java index f55c55c6c93..a50fb98f1e3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/StatementTimeoutTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/StatementTimeoutTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeFalse; import com.google.api.core.SettableApiFuture; import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; @@ -35,6 +36,7 @@ import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.connection.AbstractConnectionImplTest.ConnectionConsumer; import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection; +import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType; import com.google.common.base.Stopwatch; import com.google.common.collect.Collections2; import com.google.longrunning.Operation; @@ -58,9 +60,11 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class StatementTimeoutTest extends AbstractMockServerTest { private static final String SLOW_SELECT = "SELECT foo FROM bar"; @@ -85,10 +89,18 @@ public class StatementTimeoutTest extends AbstractMockServerTest { */ private static final int TIMEOUT_FOR_SLOW_STATEMENTS = 50; + @Parameters(name = "statementExecutorType = {0}") + public static Object[] parameters() { + return StatementExecutorType.values(); + } + + @Parameter public StatementExecutorType statementExecutorType; + protected ITConnection createConnection() { ConnectionOptions options = ConnectionOptions.newBuilder() .setUri(getBaseUrl() + ";trackSessionLeaks=false") + .setStatementExecutorType(statementExecutorType) .setConfigurator( optionsConfigurator -> optionsConfigurator @@ -618,6 +630,10 @@ private void waitForDdlRequestOnServer() { @Test public void testCancelReadOnlyAutocommit() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + mockSpanner.setExecuteStreamingSqlExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTime(EXECUTION_TIME_SLOW_STATEMENT, 0)); @@ -643,6 +659,10 @@ public void testCancelReadOnlyAutocommit() { @Test public void testCancelReadOnlyAutocommitMultipleStatements() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + mockSpanner.setExecuteStreamingSqlExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTime(EXECUTION_TIME_SLOW_STATEMENT, 0)); @@ -675,6 +695,10 @@ public void testCancelReadOnlyAutocommitMultipleStatements() { @Test public void testCancelReadOnlyTransactional() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + mockSpanner.setExecuteStreamingSqlExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTime(EXECUTION_TIME_SLOW_STATEMENT, 0)); @@ -700,6 +724,10 @@ public void testCancelReadOnlyTransactional() { @Test public void testCancelReadOnlyTransactionalMultipleStatements() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + mockSpanner.setExecuteStreamingSqlExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTime(EXECUTION_TIME_SLOW_STATEMENT, 0)); @@ -737,6 +765,10 @@ public void testCancelReadOnlyTransactionalMultipleStatements() { @Test public void testCancelReadWriteAutocommit() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + mockSpanner.setExecuteStreamingSqlExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTime(EXECUTION_TIME_SLOW_STATEMENT, 0)); @@ -761,6 +793,10 @@ public void testCancelReadWriteAutocommit() { @Test public void testCancelReadWriteAutocommitMultipleStatements() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + mockSpanner.setExecuteStreamingSqlExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTime(EXECUTION_TIME_SLOW_STATEMENT, 0)); @@ -792,6 +828,10 @@ public void testCancelReadWriteAutocommitMultipleStatements() { @Test public void testCancelReadWriteAutocommitSlowUpdate() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + mockSpanner.setExecuteSqlExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTime(EXECUTION_TIME_SLOW_STATEMENT, 0)); @@ -815,6 +855,10 @@ public void testCancelReadWriteAutocommitSlowUpdate() { @Test public void testCancelReadWriteAutocommitSlowCommit() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + mockSpanner.setCommitExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTime(EXECUTION_TIME_SLOW_STATEMENT, 0)); @@ -838,6 +882,10 @@ public void testCancelReadWriteAutocommitSlowCommit() { @Test public void testCancelReadWriteTransactional() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + mockSpanner.setExecuteStreamingSqlExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTime(EXECUTION_TIME_SLOW_STATEMENT, 0)); @@ -862,6 +910,10 @@ public void testCancelReadWriteTransactional() { @Test public void testCancelReadWriteTransactionalMultipleStatements() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + mockSpanner.setExecuteStreamingSqlExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTime(EXECUTION_TIME_SLOW_STATEMENT, 0)); @@ -928,6 +980,10 @@ static void addMockDdlOperations(int count, boolean done) { @Test public void testCancelDdlBatch() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + addSlowMockDdlOperation(); try (Connection connection = createConnection()) { @@ -951,6 +1007,10 @@ public void testCancelDdlBatch() { @Test public void testCancelDdlAutocommit() { + assumeFalse( + "Direct executor does not yet support cancelling statements", + statementExecutorType == StatementExecutorType.DIRECT_EXECUTOR); + addSlowMockDdlOperation(); try (Connection connection = createConnection()) {