diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 145ad67f827..67b0638f5d9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -641,8 +641,8 @@ private ResultSet executeQueryInternal( *
  • Specific {@link QueryOptions} passed in for this query. *
  • Any value specified in a valid environment variable when the {@link SpannerOptions} * instance was created. - *
  • The default {@link SpannerOptions#getDefaultQueryOptions()} specified for the database - * where the query is executed. + *
  • The default {@link SpannerOptions#getDefaultQueryOptions(DatabaseId)} ()} specified for + * the database where the query is executed. * */ @VisibleForTesting diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 06237131458..a33f39d47fd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -22,6 +22,7 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; +import com.google.spanner.v1.TransactionOptions.IsolationLevel; /** * Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An @@ -414,6 +415,7 @@ ServerStream batchWriteAtLeastOnce( * applied to any other requests on the transaction. *
  • {@link Options#commitStats()}: Request that the server includes commit statistics in the * {@link CommitResponse}. + *
  • {@link Options#isolationLevel(IsolationLevel)}: The isolation level for the transaction * */ TransactionRunner readWriteTransaction(TransactionOption... options); @@ -454,6 +456,7 @@ ServerStream batchWriteAtLeastOnce( * applied to any other requests on the transaction. *
  • {@link Options#commitStats()}: Request that the server includes commit statistics in the * {@link CommitResponse}. + *
  • {@link Options#isolationLevel(IsolationLevel)}: The isolation level for the transaction * */ TransactionManager transactionManager(TransactionOption... options); @@ -494,6 +497,7 @@ ServerStream batchWriteAtLeastOnce( * applied to any other requests on the transaction. *
  • {@link Options#commitStats()}: Request that the server includes commit statistics in the * {@link CommitResponse}. + *
  • {@link Options#isolationLevel(IsolationLevel)}: The isolation level for the transaction * */ AsyncRunner runAsync(TransactionOption... options); @@ -548,6 +552,7 @@ ServerStream batchWriteAtLeastOnce( * applied to any other requests on the transaction. *
  • {@link Options#commitStats()}: Request that the server includes commit statistics in the * {@link CommitResponse}. + *
  • {@link Options#isolationLevel(IsolationLevel)}: The isolation level for the transaction * */ AsyncTransactionManager transactionManagerAsync(TransactionOption... options); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index c8c588f813a..c36f1902648 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -21,6 +21,7 @@ import com.google.spanner.v1.ReadRequest.LockHint; import com.google.spanner.v1.ReadRequest.OrderBy; import com.google.spanner.v1.RequestOptions.Priority; +import com.google.spanner.v1.TransactionOptions.IsolationLevel; import java.io.Serializable; import java.time.Duration; import java.util.Objects; @@ -159,6 +160,13 @@ public static TransactionOption optimisticLock() { return OPTIMISTIC_LOCK_OPTION; } + /** + * Specifying this instructs the transaction to request {@link IsolationLevel} from the backend. + */ + public static TransactionOption isolationLevel(IsolationLevel isolationLevel) { + return new IsolationLevelOption(isolationLevel); + } + /** * Specifying this instructs the transaction to be excluded from being recorded in change streams * with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from @@ -490,6 +498,20 @@ void appendToOptions(Options options) { } } + /** Option to set isolation level for read/write transactions. */ + static final class IsolationLevelOption extends InternalOption implements TransactionOption { + private final IsolationLevel isolationLevel; + + public IsolationLevelOption(IsolationLevel isolationLevel) { + this.isolationLevel = isolationLevel; + } + + @Override + void appendToOptions(Options options) { + options.isolationLevel = isolationLevel; + } + } + private boolean withCommitStats; private Duration maxCommitDelay; @@ -512,6 +534,7 @@ void appendToOptions(Options options) { private RpcOrderBy orderBy; private RpcLockHint lockHint; private Boolean lastStatement; + private IsolationLevel isolationLevel; // Construction is via factory methods below. private Options() {} @@ -664,6 +687,10 @@ LockHint lockHint() { return lockHint == null ? null : lockHint.proto; } + IsolationLevel isolationLevel() { + return isolationLevel; + } + @Override public String toString() { StringBuilder b = new StringBuilder(); @@ -726,6 +753,9 @@ public String toString() { if (lockHint != null) { b.append("lockHint: ").append(lockHint).append(' '); } + if (isolationLevel != null) { + b.append("isolationLevel: ").append(isolationLevel).append(' '); + } return b.toString(); } @@ -767,7 +797,8 @@ public boolean equals(Object o) { && Objects.equals(directedReadOptions(), that.directedReadOptions()) && Objects.equals(orderBy(), that.orderBy()) && Objects.equals(isLastStatement(), that.isLastStatement()) - && Objects.equals(lockHint(), that.lockHint()); + && Objects.equals(lockHint(), that.lockHint()) + && Objects.equals(isolationLevel(), that.isolationLevel()); } @Override @@ -833,6 +864,9 @@ public int hashCode() { if (lockHint != null) { result = 31 * result + lockHint.hashCode(); } + if (isolationLevel != null) { + result = 31 * result + isolationLevel.hashCode(); + } return result; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 454709275f8..3da558bf8a9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -83,6 +83,9 @@ static TransactionOptions createReadWriteTransactionOptions( && previousTransactionId != com.google.protobuf.ByteString.EMPTY) { readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId); } + if (options.isolationLevel() != null) { + transactionOptions.setIsolationLevel(options.isolationLevel()); + } transactionOptions.setReadWrite(readWrite); return transactionOptions.build(); } @@ -193,6 +196,10 @@ void markUsed(Instant instant) { sessionReference.markUsed(instant); } + TransactionOptions defaultTransactionOptions() { + return this.spanner.getOptions().getDefaultTransactionOptions(); + } + public DatabaseId getDatabaseId() { return sessionReference.getDatabaseId(); } @@ -252,7 +259,11 @@ public CommitResponse writeAtLeastOnceWithOptions( if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) { transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true); } - requestBuilder.setSingleUseTransaction(transactionOptionsBuilder); + if (options.isolationLevel() != null) { + transactionOptionsBuilder.setIsolationLevel(options.isolationLevel()); + } + requestBuilder.setSingleUseTransaction( + defaultTransactionOptions().toBuilder().mergeFrom(transactionOptionsBuilder.build())); if (options.hasMaxCommitDelay()) { requestBuilder.setMaxCommitDelay( @@ -444,7 +455,11 @@ ApiFuture beginTransactionAsync( BeginTransactionRequest.newBuilder() .setSession(getName()) .setOptions( - createReadWriteTransactionOptions(transactionOptions, previousTransactionId)); + defaultTransactionOptions() + .toBuilder() + .mergeFrom( + createReadWriteTransactionOptions( + transactionOptions, previousTransactionId))); if (sessionReference.getIsMultiplexed() && mutation != null) { requestBuilder.setMutationKey(mutation); } @@ -489,7 +504,6 @@ TransactionContextImpl newTransaction(Options options, ByteString previousTransa .setOptions(options) .setTransactionId(null) .setPreviousTransactionId(previousTransactionId) - .setOptions(options) .setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter()) .setRpc(spanner.getRpc()) .setDefaultQueryOptions(spanner.getDefaultQueryOptions(getDatabaseId())) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 412bbbb151c..695e156dfc3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -66,6 +66,8 @@ import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; import com.google.spanner.v1.SpannerGrpc; +import com.google.spanner.v1.TransactionOptions; +import com.google.spanner.v1.TransactionOptions.IsolationLevel; import io.grpc.CallCredentials; import io.grpc.CompressorRegistry; import io.grpc.Context; @@ -178,6 +180,7 @@ public class SpannerOptions extends ServiceOptions { private final boolean enableExtendedTracing; private final boolean enableEndToEndTracing; private final String monitoringHost; + private final TransactionOptions defaultTransactionOptions; enum TracingFramework { OPEN_CENSUS, @@ -807,6 +810,7 @@ protected SpannerOptions(Builder builder) { enableBuiltInMetrics = builder.enableBuiltInMetrics; enableEndToEndTracing = builder.enableEndToEndTracing; monitoringHost = builder.monitoringHost; + defaultTransactionOptions = builder.defaultTransactionOptions; } /** @@ -988,6 +992,7 @@ public static class Builder private String monitoringHost = SpannerOptions.environment.getMonitoringHost(); private SslContext mTLSContext = null; private boolean isExperimentalHost = false; + private TransactionOptions defaultTransactionOptions = TransactionOptions.getDefaultInstance(); private static String createCustomClientLibToken(String token) { return token + " " + ServiceOptions.getGoogApiClientLibName(); @@ -1056,6 +1061,7 @@ protected Builder() { this.enableBuiltInMetrics = options.enableBuiltInMetrics; this.enableEndToEndTracing = options.enableEndToEndTracing; this.monitoringHost = options.monitoringHost; + this.defaultTransactionOptions = options.defaultTransactionOptions; } @Override @@ -1645,6 +1651,55 @@ public Builder setEnableEndToEndTracing(boolean enableEndToEndTracing) { return this; } + /** + * Provides the default read-write transaction options for all databases. These defaults are + * overridden by any explicit {@link com.google.cloud.spanner.Options.TransactionOption} + * provided through {@link DatabaseClient}. + * + *

    Example Usage: + * + *

    {@code
    +     * DefaultReadWriteTransactionOptions options = DefaultReadWriteTransactionOptions.newBuilder()
    +     * .setIsolationLevel(IsolationLevel.SERIALIZABLE)
    +     * .build();
    +     * }
    + */ + public static class DefaultReadWriteTransactionOptions { + private final TransactionOptions defaultTransactionOptions; + + private DefaultReadWriteTransactionOptions(TransactionOptions defaultTransactionOptions) { + this.defaultTransactionOptions = defaultTransactionOptions; + } + + public static DefaultReadWriteTransactionOptionsBuilder newBuilder() { + return new DefaultReadWriteTransactionOptionsBuilder(); + } + + public static class DefaultReadWriteTransactionOptionsBuilder { + private final TransactionOptions.Builder transactionOptionsBuilder = + TransactionOptions.newBuilder(); + + public DefaultReadWriteTransactionOptionsBuilder setIsolationLevel( + IsolationLevel isolationLevel) { + transactionOptionsBuilder.setIsolationLevel(isolationLevel); + return this; + } + + public DefaultReadWriteTransactionOptions build() { + return new DefaultReadWriteTransactionOptions(transactionOptionsBuilder.build()); + } + } + } + + /** Sets the {@link DefaultReadWriteTransactionOptions} for read-write transactions. */ + public Builder setDefaultTransactionOptions( + DefaultReadWriteTransactionOptions defaultReadWriteTransactionOptions) { + Preconditions.checkNotNull( + defaultReadWriteTransactionOptions, "DefaultReadWriteTransactionOptions cannot be null"); + this.defaultTransactionOptions = defaultReadWriteTransactionOptions.defaultTransactionOptions; + return this; + } + @SuppressWarnings("rawtypes") @Override public SpannerOptions build() { @@ -1990,6 +2045,10 @@ String getMonitoringHost() { return monitoringHost; } + public TransactionOptions getDefaultTransactionOptions() { + return defaultTransactionOptions; + } + @BetaApi public boolean isUseVirtualThreads() { return useVirtualThreads; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index fad4ce564ab..038fb4b52eb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -643,8 +643,12 @@ TransactionSelector getTransactionSelector() { if (tx == null) { return TransactionSelector.newBuilder() .setBegin( - SessionImpl.createReadWriteTransactionOptions( - options, getPreviousTransactionId())) + this.session + .defaultTransactionOptions() + .toBuilder() + .mergeFrom( + SessionImpl.createReadWriteTransactionOptions( + options, getPreviousTransactionId()))) .build(); } else { // Wait for the transaction to come available. The tx.get() call will fail with an diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index d502e9b0d5d..0fb4af2e8c7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -49,7 +49,6 @@ import com.google.cloud.NoCredentials; import com.google.cloud.Timestamp; import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; -import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.Options.RpcLockHint; @@ -98,6 +97,7 @@ import com.google.spanner.v1.ResultSetStats; import com.google.spanner.v1.StructType; import com.google.spanner.v1.StructType.Field; +import com.google.spanner.v1.TransactionOptions.IsolationLevel; import com.google.spanner.v1.Type; import com.google.spanner.v1.TypeAnnotationCode; import com.google.spanner.v1.TypeCode; @@ -1345,10 +1345,7 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi public void testWrite() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - Timestamp timestamp = - client.write( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + Timestamp timestamp = MockSpannerTestActions.writeInsertMutation(client); assertNotNull(timestamp); List beginTransactions = @@ -1375,10 +1372,7 @@ public void testWriteAborted() { mockSpanner.setCommitExecutionTime( SimulatedExecutionTime.ofException( mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); - Timestamp timestamp = - client.write( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + Timestamp timestamp = MockSpannerTestActions.writeInsertMutation(client); assertNotNull(timestamp); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); @@ -1394,10 +1388,7 @@ public void testWriteAtLeastOnceAborted() { mockSpanner.setCommitExecutionTime( SimulatedExecutionTime.ofException( mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); - Timestamp timestamp = - client.writeAtLeastOnce( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + Timestamp timestamp = MockSpannerTestActions.writeAtLeastOnceInsertMutation(client); assertNotNull(timestamp); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); @@ -1408,10 +1399,8 @@ public void testWriteAtLeastOnceAborted() { public void testWriteWithOptions() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - client.writeWithOptions( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), - Options.priority(RpcPriority.HIGH)); + MockSpannerTestActions.writeInsertMutationWithOptions( + client, Options.priority(RpcPriority.HIGH)); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -1446,10 +1435,8 @@ public void testWriteWithCommitStats() { public void testWriteWithExcludeTxnFromChangeStreams() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - client.writeWithOptions( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), - Options.excludeTxnFromChangeStreams()); + MockSpannerTestActions.writeInsertMutationWithOptions( + client, Options.excludeTxnFromChangeStreams()); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -1464,10 +1451,7 @@ public void testWriteWithExcludeTxnFromChangeStreams() { public void testWriteAtLeastOnce() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - Timestamp timestamp = - client.writeAtLeastOnce( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + Timestamp timestamp = MockSpannerTestActions.writeAtLeastOnceInsertMutation(client); assertNotNull(timestamp); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); @@ -1507,10 +1491,8 @@ public void testWriteAtLeastOnceWithCommitStats() { public void testWriteAtLeastOnceWithOptions() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - client.writeAtLeastOnceWithOptions( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), - Options.priority(RpcPriority.LOW)); + MockSpannerTestActions.writeAtLeastOnceWithOptionsInsertMutation( + client, Options.priority(RpcPriority.LOW)); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commitRequests).hasSize(1); @@ -1526,10 +1508,8 @@ public void testWriteAtLeastOnceWithOptions() { public void testWriteAtLeastOnceWithTagOptions() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - client.writeAtLeastOnceWithOptions( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), - Options.tag("app=spanner,env=test")); + MockSpannerTestActions.writeAtLeastOnceWithOptionsInsertMutation( + client, Options.tag("app=spanner,env=test")); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commitRequests).hasSize(1); @@ -1546,10 +1526,8 @@ public void testWriteAtLeastOnceWithTagOptions() { public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - client.writeAtLeastOnceWithOptions( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), - Options.excludeTxnFromChangeStreams()); + MockSpannerTestActions.writeAtLeastOnceWithOptionsInsertMutation( + client, Options.excludeTxnFromChangeStreams()); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commitRequests).hasSize(1); @@ -1928,6 +1906,9 @@ public void testReadWriteExecuteReadWithTag() { .isEqualTo("app=spanner,env=test,action=read"); assertThat(request.getRequestOptions().getTransactionTag()) .isEqualTo("app=spanner,env=test,action=txn"); + assertEquals( + IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, + request.getTransaction().getBegin().getIsolationLevel()); } @Test @@ -1950,6 +1931,9 @@ public void testExecuteUpdateWithTag() { assertNotNull(request.getTransaction().getBegin()); assertTrue(request.getTransaction().getBegin().hasReadWrite()); assertFalse(request.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + assertEquals( + IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, + request.getTransaction().getBegin().getIsolationLevel()); } @Test @@ -1976,6 +1960,9 @@ public void testBatchUpdateWithTag() { assertNotNull(request.getTransaction().getBegin()); assertTrue(request.getTransaction().getBegin().hasReadWrite()); assertFalse(request.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + assertEquals( + IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, + request.getTransaction().getBegin().getIsolationLevel()); } @Test @@ -2006,13 +1993,8 @@ public void testPartitionedDMLWithTag() { public void testCommitWithTag() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - TransactionRunner runner = - client.readWriteTransaction(Options.tag("app=spanner,env=test,action=commit")); - runner.run( - transaction -> { - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - return null; - }); + MockSpannerTestActions.commitDeleteTransaction( + client, Options.tag("app=spanner,env=test,action=commit")); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -2035,12 +2017,8 @@ public void testCommitWithTag() { public void testTransactionManagerCommitWithTag() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - try (TransactionManager manager = - client.transactionManager(Options.tag("app=spanner,env=test,action=manager"))) { - TransactionContext transaction = manager.begin(); - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - manager.commit(); - } + MockSpannerTestActions.transactionManagerCommit( + client, Options.tag("app=spanner,env=test,action=manager")); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -2049,6 +2027,9 @@ public void testTransactionManagerCommitWithTag() { assertNotNull(beginTransaction.getOptions()); assertTrue(beginTransaction.getOptions().hasReadWrite()); assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + assertEquals( + IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, + beginTransaction.getOptions().getIsolationLevel()); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -2063,14 +2044,8 @@ public void testTransactionManagerCommitWithTag() { public void testAsyncRunnerCommitWithTag() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - AsyncRunner runner = client.runAsync(Options.tag("app=spanner,env=test,action=runner")); - get( - runner.runAsync( - txn -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - }, - executor)); + MockSpannerTestActions.asyncRunnerCommit( + client, executor, Options.tag("app=spanner,env=test,action=runner")); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -2079,6 +2054,9 @@ public void testAsyncRunnerCommitWithTag() { assertNotNull(beginTransaction.getOptions()); assertTrue(beginTransaction.getOptions().hasReadWrite()); assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + assertEquals( + IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, + beginTransaction.getOptions().getIsolationLevel()); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -2093,19 +2071,8 @@ public void testAsyncRunnerCommitWithTag() { public void testAsyncTransactionManagerCommitWithTag() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - try (AsyncTransactionManager manager = - client.transactionManagerAsync(Options.tag("app=spanner,env=test,action=manager"))) { - TransactionContextFuture transaction = manager.beginAsync(); - get( - transaction - .then( - (txn, input) -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - }, - executor) - .commitAsync()); - } + MockSpannerTestActions.transactionManagerAsyncCommit( + client, executor, Options.tag("app=spanner,env=test,action=manager")); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -2114,6 +2081,9 @@ public void testAsyncTransactionManagerCommitWithTag() { assertNotNull(beginTransaction.getOptions()); assertTrue(beginTransaction.getOptions().hasReadWrite()); assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + assertEquals( + IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, + beginTransaction.getOptions().getIsolationLevel()); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -2143,8 +2113,8 @@ public void testReadWriteTxnWithExcludeTxnFromChangeStreams_executeUpdate() { public void testReadWriteTxnWithExcludeTxnFromChangeStreams_batchUpdate() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - TransactionRunner runner = client.readWriteTransaction(Options.excludeTxnFromChangeStreams()); - runner.run(transaction -> transaction.batchUpdate(Collections.singletonList(UPDATE_STATEMENT))); + MockSpannerTestActions.executeBatchUpdateTransaction( + client, Options.excludeTxnFromChangeStreams()); List requests = mockSpanner.getRequestsOfType(ExecuteBatchDmlRequest.class); @@ -2174,12 +2144,7 @@ public void testPartitionedDMLWithExcludeTxnFromChangeStreams() { public void testCommitWithExcludeTxnFromChangeStreams() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - TransactionRunner runner = client.readWriteTransaction(Options.excludeTxnFromChangeStreams()); - runner.run( - transaction -> { - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - return null; - }); + MockSpannerTestActions.commitDeleteTransaction(client, Options.excludeTxnFromChangeStreams()); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -2194,12 +2159,7 @@ public void testCommitWithExcludeTxnFromChangeStreams() { public void testTransactionManagerCommitWithExcludeTxnFromChangeStreams() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - try (TransactionManager manager = - client.transactionManager(Options.excludeTxnFromChangeStreams())) { - TransactionContext transaction = manager.begin(); - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - manager.commit(); - } + MockSpannerTestActions.transactionManagerCommit(client, Options.excludeTxnFromChangeStreams()); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -2214,14 +2174,8 @@ public void testTransactionManagerCommitWithExcludeTxnFromChangeStreams() { public void testAsyncRunnerCommitWithExcludeTxnFromChangeStreams() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - AsyncRunner runner = client.runAsync(Options.excludeTxnFromChangeStreams()); - get( - runner.runAsync( - txn -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - }, - executor)); + MockSpannerTestActions.asyncRunnerCommit( + client, executor, Options.excludeTxnFromChangeStreams()); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -2236,19 +2190,8 @@ public void testAsyncRunnerCommitWithExcludeTxnFromChangeStreams() { public void testAsyncTransactionManagerCommitWithExcludeTxnFromChangeStreams() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - try (AsyncTransactionManager manager = - client.transactionManagerAsync(Options.excludeTxnFromChangeStreams())) { - TransactionContextFuture transaction = manager.beginAsync(); - get( - transaction - .then( - (txn, input) -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - }, - executor) - .commitAsync()); - } + MockSpannerTestActions.transactionManagerAsyncCommit( + client, executor, Options.excludeTxnFromChangeStreams()); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -4177,12 +4120,7 @@ public void testPartitionedDMLWithPriority() { public void testCommitWithPriority() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - TransactionRunner runner = client.readWriteTransaction(Options.priority(RpcPriority.HIGH)); - runner.run( - transaction -> { - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - return null; - }); + MockSpannerTestActions.commitDeleteTransaction(client, Options.priority(RpcPriority.HIGH)); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -4195,12 +4133,7 @@ public void testCommitWithPriority() { public void testTransactionManagerCommitWithPriority() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - try (TransactionManager manager = - client.transactionManager(Options.priority(RpcPriority.HIGH))) { - TransactionContext transaction = manager.begin(); - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - manager.commit(); - } + MockSpannerTestActions.transactionManagerCommit(client, Options.priority(RpcPriority.HIGH)); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -4213,14 +4146,7 @@ public void testTransactionManagerCommitWithPriority() { public void testAsyncRunnerCommitWithPriority() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - AsyncRunner runner = client.runAsync(Options.priority(RpcPriority.HIGH)); - get( - runner.runAsync( - txn -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - }, - executor)); + MockSpannerTestActions.asyncRunnerCommit(client, executor, Options.priority(RpcPriority.HIGH)); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -4233,19 +4159,8 @@ public void testAsyncRunnerCommitWithPriority() { public void testAsyncTransactionManagerCommitWithPriority() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - try (AsyncTransactionManager manager = - client.transactionManagerAsync(Options.priority(RpcPriority.HIGH))) { - TransactionContextFuture transaction = manager.beginAsync(); - get( - transaction - .then( - (txn, input) -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - }, - executor) - .commitAsync()); - } + MockSpannerTestActions.transactionManagerAsyncCommit( + client, executor, Options.priority(RpcPriority.HIGH)); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -4258,12 +4173,7 @@ public void testAsyncTransactionManagerCommitWithPriority() { public void testCommitWithoutMaxCommitDelay() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - TransactionRunner runner = client.readWriteTransaction(); - runner.run( - transaction -> { - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - return null; - }); + MockSpannerTestActions.commitDeleteTransaction(client); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -4275,13 +4185,8 @@ public void testCommitWithoutMaxCommitDelay() { public void testCommitWithMaxCommitDelay() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - TransactionRunner runner = - client.readWriteTransaction(Options.maxCommitDelay(java.time.Duration.ofMillis(100))); - runner.run( - transaction -> { - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - return null; - }); + MockSpannerTestActions.commitDeleteTransaction( + client, Options.maxCommitDelay(java.time.Duration.ofMillis(100))); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -4296,11 +4201,8 @@ public void testCommitWithMaxCommitDelay() { public void testTransactionManagerCommitWithMaxCommitDelay() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - TransactionManager manager = - client.transactionManager(Options.maxCommitDelay(java.time.Duration.ofMillis(100))); - TransactionContext transaction = manager.begin(); - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - manager.commit(); + MockSpannerTestActions.transactionManagerCommit( + client, Options.maxCommitDelay(java.time.Duration.ofMillis(100))); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -4315,14 +4217,8 @@ public void testTransactionManagerCommitWithMaxCommitDelay() { public void testAsyncRunnerCommitWithMaxCommitDelay() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - AsyncRunner runner = client.runAsync(Options.maxCommitDelay(java.time.Duration.ofMillis(100))); - get( - runner.runAsync( - txn -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - }, - executor)); + MockSpannerTestActions.asyncRunnerCommit( + client, executor, Options.maxCommitDelay(java.time.Duration.ofMillis(100))); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); @@ -4337,19 +4233,8 @@ public void testAsyncRunnerCommitWithMaxCommitDelay() { public void testAsyncTransactionManagerCommitWithMaxCommitDelay() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - try (AsyncTransactionManager manager = - client.transactionManagerAsync(Options.maxCommitDelay(java.time.Duration.ofMillis(100)))) { - TransactionContextFuture transaction = manager.beginAsync(); - get( - transaction - .then( - (txn, input) -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - }, - executor) - .commitAsync()); - } + MockSpannerTestActions.transactionManagerAsyncCommit( + client, executor, Options.maxCommitDelay(java.time.Duration.ofMillis(100))); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplWithDefaultRWTransactionOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplWithDefaultRWTransactionOptionsTest.java new file mode 100644 index 00000000000..634356f2223 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplWithDefaultRWTransactionOptionsTest.java @@ -0,0 +1,358 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.cloud.spanner.MockSpannerTestUtil.INVALID_SELECT_STATEMENT; +import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1; +import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1_RESULTSET; +import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_COUNT; +import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_STATEMENT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.Options.RpcPriority; +import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.SpannerOptions.Builder.DefaultReadWriteTransactionOptions; +import com.google.protobuf.AbstractMessage; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.ExecuteBatchDmlRequest; +import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.ReadRequest; +import com.google.spanner.v1.TransactionOptions.IsolationLevel; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessServerBuilder; +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.function.Consumer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DatabaseClientImplWithDefaultRWTransactionOptionsTest { + private static final TransactionOption SERIALIZABLE_ISOLATION_OPTION = + Options.isolationLevel(IsolationLevel.SERIALIZABLE); + private static final TransactionOption RR_ISOLATION_OPTION = + Options.isolationLevel(IsolationLevel.REPEATABLE_READ); + private static final DatabaseId DATABASE_ID = + DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"); + private static MockSpannerServiceImpl mockSpanner; + private static Server server; + private static ExecutorService executor; + private static LocalChannelProvider channelProvider; + private Spanner spanner; + private Spanner spannerWithRR; + private Spanner spannerWithSerializable; + private DatabaseClient client; + private DatabaseClient clientWithRepeatableReadOption; + private DatabaseClient clientWithSerializableOption; + + @BeforeClass + public static void startStaticServer() throws IOException { + mockSpanner = new MockSpannerServiceImpl(); + mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions. + mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); + mockSpanner.putStatementResult(StatementResult.query(SELECT1, SELECT1_RESULTSET)); + mockSpanner.putStatementResult( + StatementResult.exception( + INVALID_SELECT_STATEMENT, + Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException())); + mockSpanner.putStatementResult( + StatementResult.read( + "FOO", KeySet.all(), Collections.singletonList("ID"), SELECT1_RESULTSET)); + + String uniqueName = InProcessServerBuilder.generateName(); + executor = Executors.newSingleThreadExecutor(); + server = + InProcessServerBuilder.forName(uniqueName) + // We need to use a real executor for timeouts to occur. + .scheduledExecutorService(new ScheduledThreadPoolExecutor(1)) + .addService(mockSpanner) + .build() + .start(); + channelProvider = LocalChannelProvider.create(uniqueName); + } + + @AfterClass + public static void stopServer() throws InterruptedException { + server.shutdown(); + server.awaitTermination(); + } + + @Before + public void setUp() { + mockSpanner.reset(); + mockSpanner.removeAllExecutionTimes(); + SpannerOptions.Builder spannerOptionsBuilder = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()); + spanner = spannerOptionsBuilder.build().getService(); + spannerWithRR = + spannerOptionsBuilder + .setDefaultTransactionOptions( + DefaultReadWriteTransactionOptions.newBuilder() + .setIsolationLevel(IsolationLevel.REPEATABLE_READ) + .build()) + .build() + .getService(); + spannerWithSerializable = + spannerOptionsBuilder + .setDefaultTransactionOptions( + DefaultReadWriteTransactionOptions.newBuilder() + .setIsolationLevel(IsolationLevel.SERIALIZABLE) + .build()) + .build() + .getService(); + client = spanner.getDatabaseClient(DATABASE_ID); + clientWithRepeatableReadOption = spannerWithRR.getDatabaseClient(DATABASE_ID); + clientWithSerializableOption = spannerWithSerializable.getDatabaseClient(DATABASE_ID); + } + + private void executeTest( + Consumer testAction, IsolationLevel expectedIsolationLevel) { + testAction.accept(client); + validateIsolationLevel(expectedIsolationLevel); + } + + private void executeTestWithRR( + Consumer testAction, IsolationLevel expectedIsolationLevel) { + testAction.accept(clientWithRepeatableReadOption); + validateIsolationLevel(expectedIsolationLevel); + } + + private void executeTestWithSerializable( + Consumer testAction, IsolationLevel expectedIsolationLevel) { + testAction.accept(clientWithSerializableOption); + validateIsolationLevel(expectedIsolationLevel); + } + + @After + public void tearDown() { + spanner.close(); + spannerWithRR.close(); + spannerWithSerializable.close(); + } + + @Test + public void testWrite_WithNoIsolationLevel() { + executeTest( + MockSpannerTestActions::writeInsertMutation, IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED); + } + + @Test + public void testWrite_WithRRSpannerOptions() { + executeTestWithRR(MockSpannerTestActions::writeInsertMutation, IsolationLevel.REPEATABLE_READ); + } + + @Test + public void testWriteWithOptions_WithRRSpannerOptions() { + executeTestWithRR( + c -> + MockSpannerTestActions.writeInsertMutationWithOptions( + c, Options.priority(RpcPriority.HIGH)), + IsolationLevel.REPEATABLE_READ); + } + + @Test + public void testWriteWithOptions_WithSerializableTxnOption() { + executeTestWithRR( + c -> + MockSpannerTestActions.writeInsertMutationWithOptions(c, SERIALIZABLE_ISOLATION_OPTION), + IsolationLevel.SERIALIZABLE); + } + + @Test + public void testWriteAtLeastOnce_WithSerializableSpannerOptions() { + executeTestWithSerializable( + MockSpannerTestActions::writeAtLeastOnceInsertMutation, IsolationLevel.SERIALIZABLE); + } + + @Test + public void testWriteAtLeastOnceWithOptions_WithRRTxnOption() { + executeTestWithSerializable( + c -> + MockSpannerTestActions.writeAtLeastOnceWithOptionsInsertMutation( + c, RR_ISOLATION_OPTION), + IsolationLevel.REPEATABLE_READ); + } + + @Test + public void testReadWriteTxn_WithRRSpannerOption_batchUpdate() { + executeTestWithRR( + MockSpannerTestActions::executeBatchUpdateTransaction, IsolationLevel.REPEATABLE_READ); + } + + @Test + public void testReadWriteTxn_WithSerializableTxnOption_batchUpdate() { + executeTestWithRR( + c -> MockSpannerTestActions.executeBatchUpdateTransaction(c, SERIALIZABLE_ISOLATION_OPTION), + IsolationLevel.SERIALIZABLE); + } + + @Test + public void testPartitionedDML_WithRRSpannerOption() { + executeTestWithRR( + MockSpannerTestActions::executePartitionedUpdate, + IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED); + } + + @Test + public void testCommit_WithSerializableTxnOption() { + executeTest( + c -> MockSpannerTestActions.commitDeleteTransaction(c, SERIALIZABLE_ISOLATION_OPTION), + IsolationLevel.SERIALIZABLE); + } + + @Test + public void testTransactionManagerCommit_WithRRTxnOption() { + executeTestWithSerializable( + c -> MockSpannerTestActions.transactionManagerCommit(c, RR_ISOLATION_OPTION), + IsolationLevel.REPEATABLE_READ); + } + + @Test + public void testAsyncRunnerCommit_WithRRSpannerOption() { + executeTestWithRR( + c -> MockSpannerTestActions.asyncRunnerCommit(c, executor), IsolationLevel.REPEATABLE_READ); + } + + @Test + public void testAsyncTransactionManagerCommit_WithSerializableTxnOption() { + executeTestWithRR( + c -> + MockSpannerTestActions.transactionManagerAsyncCommit( + c, executor, SERIALIZABLE_ISOLATION_OPTION), + IsolationLevel.SERIALIZABLE); + } + + @Test + public void testReadWriteTxn_WithNoOptions() { + executeTest(MockSpannerTestActions::executeSelect1, IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED); + } + + @Test + public void executeSqlWithRWTransactionOptions_RepeatableRead() { + executeTest( + c -> MockSpannerTestActions.executeSelect1(c, RR_ISOLATION_OPTION), + IsolationLevel.REPEATABLE_READ); + } + + @Test + public void + executeSqlWithDefaultSpannerOptions_SerializableAndRWTransactionOptions_RepeatableRead() { + executeTestWithSerializable( + c -> MockSpannerTestActions.executeSelect1(c, RR_ISOLATION_OPTION), + IsolationLevel.REPEATABLE_READ); + } + + @Test + public void + executeSqlWithDefaultSpannerOptions_RepeatableReadAndRWTransactionOptions_Serializable() { + executeTestWithRR( + c -> MockSpannerTestActions.executeSelect1(c, SERIALIZABLE_ISOLATION_OPTION), + IsolationLevel.SERIALIZABLE); + } + + @Test + public void executeSqlWithDefaultSpannerOptions_RepeatableReadAndNoRWTransactionOptions() { + executeTestWithRR(MockSpannerTestActions::executeSelect1, IsolationLevel.REPEATABLE_READ); + } + + @Test + public void executeSqlWithRWTransactionOptions_Serializable() { + executeTest( + c -> MockSpannerTestActions.executeSelect1(c, SERIALIZABLE_ISOLATION_OPTION), + IsolationLevel.SERIALIZABLE); + } + + @Test + public void readWithRWTransactionOptions_RepeatableRead() { + executeTest( + c -> MockSpannerTestActions.executeReadFoo(c, RR_ISOLATION_OPTION), + IsolationLevel.REPEATABLE_READ); + } + + @Test + public void readWithRWTransactionOptions_Serializable() { + executeTest( + c -> MockSpannerTestActions.executeReadFoo(c, SERIALIZABLE_ISOLATION_OPTION), + IsolationLevel.SERIALIZABLE); + } + + @Test + public void beginTransactionWithRWTransactionOptions_RepeatableRead() { + executeTest( + c -> MockSpannerTestActions.executeInvalidAndValidSql(c, RR_ISOLATION_OPTION), + IsolationLevel.REPEATABLE_READ); + } + + @Test + public void beginTransactionWithRWTransactionOptions_Serializable() { + executeTest( + c -> MockSpannerTestActions.executeInvalidAndValidSql(c, SERIALIZABLE_ISOLATION_OPTION), + IsolationLevel.SERIALIZABLE); + } + + private void validateIsolationLevel(IsolationLevel isolationLevel) { + boolean foundMatchingRequest = false; + for (AbstractMessage request : mockSpanner.getRequests()) { + if (request instanceof ExecuteSqlRequest) { + foundMatchingRequest = true; + assertEquals( + ((ExecuteSqlRequest) request).getTransaction().getBegin().getIsolationLevel(), + isolationLevel); + } else if (request instanceof BeginTransactionRequest) { + foundMatchingRequest = true; + assertEquals( + ((BeginTransactionRequest) request).getOptions().getIsolationLevel(), isolationLevel); + } else if (request instanceof ReadRequest) { + foundMatchingRequest = true; + assertEquals( + ((ReadRequest) request).getTransaction().getBegin().getIsolationLevel(), + isolationLevel); + } else if (request instanceof CommitRequest) { + foundMatchingRequest = true; + assertEquals( + ((CommitRequest) request).getSingleUseTransaction().getIsolationLevel(), + isolationLevel); + } else if (request instanceof ExecuteBatchDmlRequest) { + foundMatchingRequest = true; + assertEquals( + ((ExecuteBatchDmlRequest) request).getTransaction().getBegin().getIsolationLevel(), + isolationLevel); + } + if (foundMatchingRequest) { + break; + } + } + assertTrue("No gRPC call is made", foundMatchingRequest); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index 94b6de149c3..a70e2a7aa34 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -590,19 +590,7 @@ public void testInlinedBeginFirstQueryReturnsUnavailable() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); mockSpanner.setExecuteStreamingSqlExecutionTime( SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0)); - long value = - client - .readWriteTransaction() - .run( - transaction -> { - // The first attempt will return UNAVAILABLE and retry internally. - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) { - return rs.getLong(0); - } - } - return 0L; - }); + long value = MockSpannerTestActions.executeSelect1(client); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2); @@ -614,20 +602,7 @@ public void testInlinedBeginFirstReadReturnsUnavailable() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); mockSpanner.setStreamingReadExecutionTime( SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0)); - Long value = - client - .readWriteTransaction() - .run( - transaction -> { - // The first attempt will return UNAVAILABLE and retry internally. - try (ResultSet rs = - transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { - while (rs.next()) { - return rs.getLong(0); - } - } - return 0L; - }); + Long value = MockSpannerTestActions.executeReadFoo(client); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countRequests(ReadRequest.class)).isEqualTo(2); @@ -641,22 +616,7 @@ public void testInlinedBeginFirstReadReturnsUnavailableRetryReturnsAborted() { SimulatedExecutionTime.ofExceptions( Arrays.asList( Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException()))); - Long value = - client - .readWriteTransaction() - .run( - transaction -> { - // The first attempt will return UNAVAILABLE and retry internally. - // The second attempt will return ABORTED and should cause the transaction to - // retry. - try (ResultSet rs = - transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { - if (rs.next()) { - return rs.getLong(0); - } - } - return 0L; - }); + Long value = MockSpannerTestActions.executeReadFoo(client); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); assertThat(countRequests(ReadRequest.class)).isEqualTo(3); @@ -670,21 +630,7 @@ public void testInlinedBeginFirstQueryReturnsUnavailableRetryReturnsAborted() { SimulatedExecutionTime.ofExceptions( Arrays.asList( Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException()))); - Long value = - client - .readWriteTransaction() - .run( - transaction -> { - // The first attempt will return UNAVAILABLE and retry internally. - // The second attempt will return ABORTED and should cause the transaction to - // retry. - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - if (rs.next()) { - return rs.getLong(0); - } - } - return 0L; - }); + Long value = MockSpannerTestActions.executeSelect1(client); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3); @@ -721,24 +667,7 @@ public void testInlinedBeginFirstReadReturnsUnavailableRetryReturnsAborted_WithC SimulatedExecutionTime.ofExceptions( Arrays.asList( Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException()))); - Long value = - client - .readWriteTransaction() - .run( - transaction -> { - // The first attempt will return UNAVAILABLE and retry internally. - // The second attempt will return ABORTED and should cause the transaction to - // retry. - try (ResultSet rs = - transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { - if (rs.next()) { - return rs.getLong(0); - } - } catch (AbortedException e) { - // Ignore the AbortedException and let the commit handle it. - } - return 0L; - }); + Long value = MockSpannerTestActions.executeReadFoo(client); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); assertThat(countRequests(ReadRequest.class)).isEqualTo(3); @@ -780,23 +709,7 @@ public void testInlinedBeginFirstDmlReturnsUnavailableRetryReturnsAborted_WithCa SimulatedExecutionTime.ofExceptions( Arrays.asList( Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException()))); - Long value = - client - .readWriteTransaction() - .run( - transaction -> { - // The first attempt will return UNAVAILABLE and retry internally. - // The second attempt will return ABORTED and should cause the transaction to - // retry. - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - if (rs.next()) { - return rs.getLong(0); - } - } catch (AbortedException e) { - // Ignore the AbortedException and let the commit handle it. - } - return 0L; - }); + Long value = MockSpannerTestActions.executeSelect1(client); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3); @@ -959,20 +872,7 @@ public void testInlinedBeginCommitAfterReadReturnsUnavailable() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); mockSpanner.setCommitExecutionTime( SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException())); - Long value = - client - .readWriteTransaction() - .run( - transaction -> { - // The first attempt will return UNAVAILABLE and retry internally. - try (ResultSet rs = - transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { - if (rs.next()) { - return rs.getLong(0); - } - } - return 0L; - }); + Long value = MockSpannerTestActions.executeReadFoo(client); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countRequests(ReadRequest.class)).isEqualTo(1); @@ -1013,18 +913,7 @@ public void testInlinedBeginFirstReadReturnsUnavailableAndCommitAborts() { public void testInlinedBeginTxWithQuery() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); - long updateCount = - client - .readWriteTransaction() - .run( - transaction -> { - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) { - return rs.getLong(0); - } - } - return 0L; - }); + long updateCount = MockSpannerTestActions.executeSelect1(client); assertThat(updateCount).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1); @@ -1035,19 +924,7 @@ public void testInlinedBeginTxWithQuery() { @Test public void testInlinedBeginTxWithRead() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - long updateCount = - client - .readWriteTransaction() - .run( - transaction -> { - try (ResultSet rs = - transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { - while (rs.next()) { - return rs.getLong(0); - } - } - return 0L; - }); + long updateCount = MockSpannerTestActions.executeReadFoo(client); assertThat(updateCount).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countRequests(ReadRequest.class)).isEqualTo(1); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestActions.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestActions.java new file mode 100644 index 00000000000..b7dbacff118 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestActions.java @@ -0,0 +1,158 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.cloud.spanner.MockSpannerTestUtil.INVALID_SELECT_STATEMENT; +import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1; +import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_STATEMENT; +import static com.google.cloud.spanner.SpannerApiFutures.get; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import com.google.api.core.ApiFutures; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; +import com.google.cloud.spanner.Options.TransactionOption; +import java.util.Collections; +import java.util.concurrent.Executor; + +public class MockSpannerTestActions { + + static final Mutation TEST_MUTATION = + Mutation.newInsertBuilder("foo").set("id").to(1L).set("name").to("bar").build(); + + static Timestamp writeInsertMutation(DatabaseClient client) { + return client.write(Collections.singletonList(TEST_MUTATION)); + } + + static void writeInsertMutationWithOptions(DatabaseClient client, TransactionOption... options) { + client.writeWithOptions(Collections.singletonList(TEST_MUTATION), options); + } + + static Timestamp writeAtLeastOnceInsertMutation(DatabaseClient client) { + return client.writeAtLeastOnce(Collections.singletonList(TEST_MUTATION)); + } + + static void writeAtLeastOnceWithOptionsInsertMutation( + DatabaseClient client, TransactionOption... options) { + client.writeAtLeastOnceWithOptions(Collections.singletonList(TEST_MUTATION), options); + } + + static void executeBatchUpdateTransaction(DatabaseClient client, TransactionOption... options) { + client + .readWriteTransaction(options) + .run(transaction -> transaction.batchUpdate(Collections.singletonList(UPDATE_STATEMENT))); + } + + static void executePartitionedUpdate(DatabaseClient client) { + client.executePartitionedUpdate(UPDATE_STATEMENT); + } + + static void commitDeleteTransaction(DatabaseClient client, TransactionOption... options) { + client + .readWriteTransaction(options) + .run( + transaction -> { + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + return null; + }); + } + + static void transactionManagerCommit(DatabaseClient client, TransactionOption... options) { + try (TransactionManager manager = client.transactionManager(options)) { + TransactionContext transaction = manager.begin(); + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + manager.commit(); + } + } + + static void asyncRunnerCommit( + DatabaseClient client, Executor executor, TransactionOption... options) { + AsyncRunner runner = client.runAsync(options); + SpannerApiFutures.get( + runner.runAsync( + txn -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + }, + executor)); + } + + static void transactionManagerAsyncCommit( + DatabaseClient client, Executor executor, TransactionOption... options) { + try (AsyncTransactionManager manager = client.transactionManagerAsync(options)) { + TransactionContextFuture transaction = manager.beginAsync(); + get( + transaction + .then( + (txn, input) -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + }, + executor) + .commitAsync()); + } + } + + static Long executeSelect1(DatabaseClient client, TransactionOption... options) { + return client + .readWriteTransaction(options) + .run( + transaction -> { + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + return rs.getLong(0); + } + } catch (AbortedException e) { + + } + return 0L; + }); + } + + static Long executeReadFoo(DatabaseClient client, TransactionOption... options) { + return client + .readWriteTransaction(options) + .run( + transaction -> { + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { + while (rs.next()) { + return rs.getLong(0); + } + } catch (AbortedException e) { + // Ignore the AbortedException and let the commit handle it. + } + return 0L; + }); + } + + static Long executeInvalidAndValidSql(DatabaseClient client, TransactionOption... options) { + return client + .readWriteTransaction(options) + .run( + transaction -> { + // This query carries the BeginTransaction, but fails. The BeginTransaction will + // then be carried by the subsequent statement. + try (ResultSet rs = transaction.executeQuery(INVALID_SELECT_STATEMENT)) { + SpannerException e = assertThrows(SpannerException.class, () -> rs.next()); + assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode()); + } + return transaction.executeUpdate(UPDATE_STATEMENT); + }); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java index 83bb1728ac0..e2e012f8ae0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java @@ -50,7 +50,8 @@ public class MockSpannerTestUtil { .setMetadata(SELECT1_METADATA) .build(); public static final Statement SELECT1_FROM_TABLE = Statement.of("SELECT 1 FROM FOO WHERE 1=1"); - + static final Statement INVALID_SELECT_STATEMENT = + Statement.of("SELECT * FROM NON_EXISTENT_TABLE"); static final String TEST_PROJECT = "my-project"; static final String TEST_INSTANCE = "my-instance"; static final String TEST_DATABASE = "my-database"; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index c65bab64603..388539524fd 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -345,10 +345,7 @@ public void testWriteAtLeastOnceAborted() { mockSpanner.setCommitExecutionTime( SimulatedExecutionTime.ofException( mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); - Timestamp timestamp = - client.writeAtLeastOnce( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + Timestamp timestamp = MockSpannerTestActions.writeAtLeastOnceInsertMutation(client); assertNotNull(timestamp); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); @@ -366,10 +363,7 @@ public void testWriteAtLeastOnceAborted() { public void testWriteAtLeastOnce() { DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - Timestamp timestamp = - client.writeAtLeastOnce( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + Timestamp timestamp = MockSpannerTestActions.writeAtLeastOnceInsertMutation(client); assertNotNull(timestamp); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); @@ -419,10 +413,8 @@ public void testWriteAtLeastOnceWithCommitStats() { public void testWriteAtLeastOnceWithOptions() { DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - client.writeAtLeastOnceWithOptions( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), - Options.priority(RpcPriority.LOW)); + MockSpannerTestActions.writeAtLeastOnceWithOptionsInsertMutation( + client, Options.priority(RpcPriority.LOW)); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commitRequests).hasSize(1); @@ -443,10 +435,8 @@ public void testWriteAtLeastOnceWithOptions() { public void testWriteAtLeastOnceWithTagOptions() { DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - client.writeAtLeastOnceWithOptions( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), - Options.tag("app=spanner,env=test")); + MockSpannerTestActions.writeAtLeastOnceWithOptionsInsertMutation( + client, Options.tag("app=spanner,env=test")); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commitRequests).hasSize(1); @@ -468,10 +458,8 @@ public void testWriteAtLeastOnceWithTagOptions() { public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - client.writeAtLeastOnceWithOptions( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), - Options.excludeTxnFromChangeStreams()); + MockSpannerTestActions.writeAtLeastOnceWithOptionsInsertMutation( + client, Options.excludeTxnFromChangeStreams()); List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commitRequests).hasSize(1); @@ -585,10 +573,7 @@ public void testMutationUsingWrite() { mockSpanner.setCommitExecutionTime( SimulatedExecutionTime.ofException( mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); - Timestamp timestamp = - client.write( - Collections.singletonList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + Timestamp timestamp = MockSpannerTestActions.writeInsertMutation(client); assertNotNull(timestamp); List beginTransactionRequests = @@ -1223,15 +1208,7 @@ public void testMutationOnlyUsingAsyncRunner() { // Test verifies mutation-only case within a R/W transaction via AsyncRunner. DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - AsyncRunner runner = client.runAsync(); - get( - runner.runAsync( - txn -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - }, - MoreExecutors.directExecutor())); - + MockSpannerTestActions.asyncRunnerCommit(client, MoreExecutors.directExecutor()); // Verify that the mutation key is set in BeginTransactionRequest List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); @@ -1255,18 +1232,7 @@ public void testMutationOnlyUsingAsyncTransactionManager() { // Test verifies mutation-only case within a R/W transaction via AsyncTransactionManager. DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - try (AsyncTransactionManager manager = client.transactionManagerAsync()) { - TransactionContextFuture transaction = manager.beginAsync(); - get( - transaction - .then( - (txn, input) -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - }, - MoreExecutors.directExecutor()) - .commitAsync()); - } + MockSpannerTestActions.transactionManagerAsyncCommit(client, MoreExecutors.directExecutor()); // Verify that the mutation key is set in BeginTransactionRequest List beginTransactions = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index 17c25558f3b..e2bcc92fedc 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -28,12 +28,16 @@ import com.google.cloud.spanner.Options.RpcLockHint; import com.google.cloud.spanner.Options.RpcOrderBy; import com.google.cloud.spanner.Options.RpcPriority; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.spanner.v1.DirectedReadOptions; import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas; import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection; import com.google.spanner.v1.ReadRequest.LockHint; import com.google.spanner.v1.ReadRequest.OrderBy; import com.google.spanner.v1.RequestOptions.Priority; +import com.google.spanner.v1.TransactionOptions.IsolationLevel; +import com.google.spanner.v1.TransactionOptions.ReadWrite; +import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -117,7 +121,7 @@ public void allOptionsAbsent() { assertThat(options.equals(options)).isTrue(); assertThat(options.equals(null)).isFalse(); assertThat(options.equals(this)).isFalse(); - + assertNull(options.isolationLevel()); assertThat(options.hashCode()).isEqualTo(31); } @@ -375,6 +379,15 @@ public void testTransactionOptionsPriority() { assertEquals("priority: " + priority + " ", options.toString()); } + @Test + public void testTransactionOptionsIsolationLevel() { + Options options = + Options.fromTransactionOptions(Options.isolationLevel(IsolationLevel.REPEATABLE_READ)); + assertEquals(options.isolationLevel(), IsolationLevel.REPEATABLE_READ); + assertEquals( + "isolationLevel: " + IsolationLevel.REPEATABLE_READ.name() + " ", options.toString()); + } + @Test public void testReadOptionsOrderBy() { RpcOrderBy orderBy = RpcOrderBy.NO_ORDER; @@ -772,6 +785,28 @@ public void transactionOptionsExcludeTxnFromChangeStreams() { assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); } + @Test + public void transactionOptionsIsolationLevel() { + Options option1 = + Options.fromTransactionOptions(Options.isolationLevel(IsolationLevel.REPEATABLE_READ)); + Options option2 = + Options.fromTransactionOptions(Options.isolationLevel(IsolationLevel.REPEATABLE_READ)); + Options option3 = Options.fromTransactionOptions(); + + assertEquals(option1, option2); + assertEquals(option1.hashCode(), option2.hashCode()); + assertNotEquals(option1, option3); + assertNotEquals(option1.hashCode(), option3.hashCode()); + + assertEquals(option1.isolationLevel(), IsolationLevel.REPEATABLE_READ); + assertThat(option1.toString()) + .contains("isolationLevel: " + IsolationLevel.REPEATABLE_READ.name()); + + assertNull(option3.isolationLevel()); + assertThat(option3.toString()) + .doesNotContain("isolationLevel: " + IsolationLevel.REPEATABLE_READ.name()); + } + @Test public void updateOptionsExcludeTxnFromChangeStreams() { Options option1 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); @@ -807,4 +842,35 @@ public void testLastStatement() { assertNull(option3.isLastStatement()); assertThat(option3.toString()).doesNotContain("lastStatement: true"); } + + @Test + public void testTransactionOptionCombine_WithNoSpannerOptions() { + com.google.spanner.v1.TransactionOptions primaryOptions = + com.google.spanner.v1.TransactionOptions.newBuilder() + .setIsolationLevel(IsolationLevel.SERIALIZABLE) + .setExcludeTxnFromChangeStreams(true) + .setReadWrite(ReadWrite.newBuilder().setReadLockMode(ReadLockMode.PESSIMISTIC)) + .build(); + com.google.spanner.v1.TransactionOptions spannerOptions = + com.google.spanner.v1.TransactionOptions.newBuilder() + .setIsolationLevel(IsolationLevel.REPEATABLE_READ) + .build(); + com.google.spanner.v1.TransactionOptions combinedOptions = + spannerOptions.toBuilder().mergeFrom(primaryOptions).build(); + assertEquals(combinedOptions.getIsolationLevel(), IsolationLevel.SERIALIZABLE); + assertTrue(combinedOptions.getExcludeTxnFromChangeStreams()); + assertEquals( + combinedOptions.getReadWrite(), + ReadWrite.newBuilder().setReadLockMode(ReadLockMode.PESSIMISTIC).build()); + } + + @Test + public void testOptions_WithMultipleDifferentIsolationLevels() { + TransactionOption[] transactionOptions = { + Options.isolationLevel(IsolationLevel.REPEATABLE_READ), + Options.isolationLevel(IsolationLevel.SERIALIZABLE) + }; + Options options = Options.fromTransactionOptions(transactionOptions); + assertEquals(options.isolationLevel(), IsolationLevel.SERIALIZABLE); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadWriteTransactionWithInlineBeginTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadWriteTransactionWithInlineBeginTest.java index 225bee86347..aa268382b34 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadWriteTransactionWithInlineBeginTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadWriteTransactionWithInlineBeginTest.java @@ -180,18 +180,7 @@ public void singleBatchUpdate() { @Test public void singleQuery() { - Long value = - client - .readWriteTransaction() - .run( - transaction -> { - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) { - return rs.getLong(0); - } - } - return 0L; - }); + Long value = MockSpannerTestActions.executeSelect1(client); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countTransactionsStarted()).isEqualTo(1); @@ -406,17 +395,7 @@ public void failedBatchUpdateAndThenUpdate() { @Test public void executeSqlWithOptimisticConcurrencyControl() { - client - .readWriteTransaction(Options.optimisticLock()) - .run( - transaction -> { - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) { - assertEquals(rs.getLong(0), 1); - } - } - return null; - }); + MockSpannerTestActions.executeSelect1(client, Options.optimisticLock()); Collection requests = mockSpanner.getRequests().stream() .filter(msg -> msg.getClass().equals(ExecuteSqlRequest.class)) @@ -428,18 +407,8 @@ public void executeSqlWithOptimisticConcurrencyControl() { @Test public void readWithOptimisticConcurrencyControl() { - client - .readWriteTransaction(Options.optimisticLock()) - .run( - transaction -> { - try (ResultSet rs = - transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) { - while (rs.next()) { - assertEquals(rs.getLong(0), 1); - } - } - return null; - }); + Long updateCount = MockSpannerTestActions.executeReadFoo(client, Options.optimisticLock()); + assertThat(updateCount).isEqualTo(1L); Collection requests = mockSpanner.getRequests().stream() .filter(msg -> msg.getClass().equals(ReadRequest.class)) @@ -451,20 +420,7 @@ public void readWithOptimisticConcurrencyControl() { @Test public void beginTransactionWithOptimisticConcurrencyControl() { - client - .readWriteTransaction(Options.optimisticLock()) - .run( - transaction -> { - // Instead of adding the BeginTransaction option to the next statement, the client - // library will force a complete retry of the entire transaction, and use an explicit - // BeginTransaction RPC invocation for that transaction in order to include the failed - // statement in the transaction as well. - try (ResultSet rs = transaction.executeQuery(INVALID_SELECT_STATEMENT)) { - SpannerException e = assertThrows(SpannerException.class, () -> rs.next()); - assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode()); - } - return transaction.executeUpdate(UPDATE_STATEMENT); - }); + MockSpannerTestActions.executeInvalidAndValidSql(client, Options.optimisticLock()); Collection requests = mockSpanner.getRequests().stream() .filter(msg -> msg.getClass().equals(BeginTransactionRequest.class)) @@ -476,19 +432,7 @@ public void beginTransactionWithOptimisticConcurrencyControl() { @Test public void failedQueryAndThenUpdate() { - Long updateCount = - client - .readWriteTransaction() - .run( - transaction -> { - // This query carries the BeginTransaction, but fails. The BeginTransaction will - // then be carried by the subsequent statement. - try (ResultSet rs = transaction.executeQuery(INVALID_SELECT_STATEMENT)) { - SpannerException e = assertThrows(SpannerException.class, () -> rs.next()); - assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode()); - } - return transaction.executeUpdate(UPDATE_STATEMENT); - }); + Long updateCount = MockSpannerTestActions.executeInvalidAndValidSql(client); assertThat(updateCount).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); assertThat(countTransactionsStarted()).isEqualTo(2); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 2a850514d0d..e0403f72d1c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -48,6 +48,7 @@ import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.Session; import com.google.spanner.v1.Transaction; +import com.google.spanner.v1.TransactionOptions; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -90,6 +91,8 @@ public static void setupOpenTelemetry() { public void setUp() { MockitoAnnotations.initMocks(this); when(spannerOptions.getNumChannels()).thenReturn(4); + when(spannerOptions.getDefaultTransactionOptions()) + .thenReturn(TransactionOptions.getDefaultInstance()); when(spannerOptions.getPrefetchChunks()).thenReturn(1); when(spannerOptions.getDatabaseRole()).thenReturn("role"); when(spannerOptions.getRetrySettings()).thenReturn(RetrySettings.newBuilder().build()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 4249f05c17f..4538be784b1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -88,6 +88,7 @@ import com.google.spanner.v1.ResultSetStats; import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.Transaction; +import com.google.spanner.v1.TransactionOptions; import io.opencensus.metrics.LabelValue; import io.opencensus.metrics.MetricRegistry; import io.opencensus.metrics.Metrics; @@ -1478,6 +1479,8 @@ public void testSessionNotFoundReadWriteTransaction() { .thenReturn( SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()); final SessionImpl closedSession = mock(SessionImpl.class); + when(closedSession.defaultTransactionOptions()) + .thenReturn(TransactionOptions.getDefaultInstance()); when(closedSession.getName()) .thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-closed"); when(closedSession.getErrorHandler()).thenReturn(DefaultErrorHandler.INSTANCE); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 285097d4af7..9fc065f944c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -37,6 +37,7 @@ import com.google.cloud.NoCredentials; import com.google.cloud.ServiceOptions; import com.google.cloud.TransportOptions; +import com.google.cloud.spanner.SpannerOptions.Builder.DefaultReadWriteTransactionOptions; import com.google.cloud.spanner.SpannerOptions.FixedCloseableExecutorProvider; import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; @@ -61,6 +62,7 @@ import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.SpannerGrpc; +import com.google.spanner.v1.TransactionOptions.IsolationLevel; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -767,6 +769,24 @@ public void testMonitoringHost() { .isEqualTo(metricsEndpoint); } + @Test + public void testTransactionOptions() { + DefaultReadWriteTransactionOptions transactionOptions = + DefaultReadWriteTransactionOptions.newBuilder() + .setIsolationLevel(IsolationLevel.SERIALIZABLE) + .build(); + assertNotNull( + SpannerOptions.newBuilder().setProjectId("p").build().getDefaultTransactionOptions()); + assertThat( + SpannerOptions.newBuilder() + .setProjectId("p") + .setDefaultTransactionOptions(transactionOptions) + .build() + .getDefaultTransactionOptions() + .getIsolationLevel()) + .isEqualTo(IsolationLevel.SERIALIZABLE); + } + @Test public void testSetDirectedReadOptions() { final DirectedReadOptions directedReadOptions = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index aee3d5ed5b4..547f6b70a22 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -49,6 +49,7 @@ import com.google.spanner.v1.ResultSetStats; import com.google.spanner.v1.Session; import com.google.spanner.v1.Transaction; +import com.google.spanner.v1.TransactionOptions; import io.opentelemetry.api.OpenTelemetry; import java.util.Collections; import java.util.UUID; @@ -207,6 +208,8 @@ public void commitAfterRollbackFails() { public void usesPreparedTransaction() { SpannerOptions options = mock(SpannerOptions.class); when(options.getNumChannels()).thenReturn(4); + when(options.getDefaultTransactionOptions()) + .thenReturn(TransactionOptions.getDefaultInstance()); GrpcTransportOptions transportOptions = mock(GrpcTransportOptions.class); when(transportOptions.getExecutorFactory()).thenReturn(new TestExecutorFactory()); when(options.getTransportOptions()).thenReturn(transportOptions); @@ -288,6 +291,8 @@ public void inlineBegin() { when(options.getNumChannels()).thenReturn(4); GrpcTransportOptions transportOptions = mock(GrpcTransportOptions.class); when(transportOptions.getExecutorFactory()).thenReturn(new TestExecutorFactory()); + when(options.getDefaultTransactionOptions()) + .thenReturn(TransactionOptions.getDefaultInstance()); when(options.getTransportOptions()).thenReturn(transportOptions); SessionPoolOptions sessionPoolOptions = SessionPoolOptions.newBuilder().setMinSessions(0).setIncStep(1).build(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index d8bd6ed448d..3068b38f3ef 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -58,6 +58,7 @@ import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.Session; import com.google.spanner.v1.Transaction; +import com.google.spanner.v1.TransactionOptions; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -160,6 +161,8 @@ public void setUp() { public void usesPreparedTransaction() { SpannerOptions options = mock(SpannerOptions.class); when(options.getNumChannels()).thenReturn(4); + when(options.getDefaultTransactionOptions()) + .thenReturn(TransactionOptions.getDefaultInstance()); GrpcTransportOptions transportOptions = mock(GrpcTransportOptions.class); when(transportOptions.getExecutorFactory()).thenReturn(new TestExecutorFactory()); when(options.getTransportOptions()).thenReturn(transportOptions); @@ -316,7 +319,8 @@ public void batchDmlFailedPrecondition() { public void inlineBegin() { SpannerImpl spanner = mock(SpannerImpl.class); SpannerOptions options = mock(SpannerOptions.class); - + when(options.getDefaultTransactionOptions()) + .thenReturn(TransactionOptions.getDefaultInstance()); when(spanner.getRpc()).thenReturn(rpc); when(spanner.getDefaultQueryOptions(Mockito.any(DatabaseId.class))) .thenReturn(QueryOptions.getDefaultInstance());