From 9a396b49c63b86e7d764e4daf6dbb22499236642 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 10 Feb 2025 09:28:24 +0000 Subject: [PATCH 1/4] chore(spanner): add interceptor --- .../it/ITAsyncTransactionRetryTest.java | 36 +++++++++++ .../connection/it/ITSqlMusicScriptTest.java | 2 + .../connection/it/ITTransactionRetryTest.java | 60 +++++++++++++++++++ 3 files changed, 98 insertions(+) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITAsyncTransactionRetryTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITAsyncTransactionRetryTest.java index 744d7042df4..e25e376ca22 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITAsyncTransactionRetryTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITAsyncTransactionRetryTest.java @@ -221,6 +221,8 @@ public void testCommitAborted() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); ApiFuture count = getTestRecordCountAsync(connection); // do an insert ApiFuture updateCount = @@ -253,6 +255,8 @@ public void testInsertAborted() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); ApiFuture count = getTestRecordCountAsync(connection); // indicate that the next statement should abort interceptor.setProbability(1.0); @@ -276,6 +280,8 @@ public void testUpdateAborted() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); ApiFuture count = getTestRecordCountAsync(connection); // insert a test record connection.executeUpdateAsync( @@ -309,6 +315,8 @@ public void testQueryAborted() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // insert a test record connection.executeUpdateAsync( Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test aborted')")); @@ -359,6 +367,8 @@ public void testNextCallAborted() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // insert two test records connection.executeUpdateAsync( Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')")); @@ -392,6 +402,8 @@ public void testMultipleAborts() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); ApiFuture count = getTestRecordCountAsync(connection); // do three inserts which all will abort and retry interceptor.setProbability(1.0); @@ -428,6 +440,8 @@ public void testAbortAfterSelect() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); ApiFuture count = getTestRecordCountAsync(connection); // insert a test record connection.executeUpdateAsync( @@ -504,6 +518,8 @@ public void testAbortWithResultSetHalfway() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // insert two test records connection.executeUpdateAsync( Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')")); @@ -539,6 +555,8 @@ public void testAbortWithResultSetFullyConsumed() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // insert two test records connection.executeUpdateAsync( Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')")); @@ -581,6 +599,8 @@ public void testAbortWithConcurrentInsert() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // insert two test records connection.executeUpdateAsync( Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')")); @@ -632,6 +652,8 @@ public void testAbortWithConcurrentDelete() { AbortInterceptor interceptor = new AbortInterceptor(0); // first insert two test records try (ITConnection connection = createConnection()) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); connection.executeUpdateAsync( Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')")); connection.executeUpdateAsync( @@ -641,6 +663,8 @@ public void testAbortWithConcurrentDelete() { // open a new connection and select the two test records try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // select the test records and consume the entire result set try (AsyncResultSet rs = connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) { @@ -694,6 +718,8 @@ public void testAbortWithConcurrentUpdate() { // open a new connection and select the two test records try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // select the test records and consume the entire result set try (AsyncResultSet rs = connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) { @@ -744,6 +770,8 @@ public void testAbortWithUnseenConcurrentInsert() throws InterruptedException { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // insert three test records connection.executeUpdateAsync( Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')")); @@ -833,6 +861,8 @@ public void testRetryLargeResultSet() { final long UPDATED_RECORDS = 1000L; AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection()) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // insert test records for (int i = 0; i < NUMBER_OF_TEST_RECORDS; i++) { connection.bufferedWrite( @@ -845,6 +875,8 @@ public void testRetryLargeResultSet() { } try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // select the test records and iterate over them try (AsyncResultSet rs = connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) { @@ -867,6 +899,8 @@ public void testRetryLargeResultSet() { // Wait until the entire result set has been consumed. get(finished); } + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // Do an update that will abort and retry. interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); @@ -898,6 +932,8 @@ public void testRetryHighAbortRate() { AbortInterceptor interceptor = new AbortInterceptor(0.25D); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // insert test records for (int i = 0; i < NUMBER_OF_TEST_RECORDS; i++) { connection.bufferedWrite( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITSqlMusicScriptTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITSqlMusicScriptTest.java index e7afe957705..745c57cfb2a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITSqlMusicScriptTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITSqlMusicScriptTest.java @@ -71,6 +71,8 @@ public void test02_RunAbortedTest() { long numberOfSongs = 0L; AbortInterceptor interceptor = new AbortInterceptor(0.0D); try (ITConnection connection = createConnection(interceptor)) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); connection.setAutocommit(false); connection.setRetryAbortsInternally(true); // Read all data from the different music tables in the transaction diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITTransactionRetryTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITTransactionRetryTest.java index 0cf3abda6bf..67bccf17910 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITTransactionRetryTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITTransactionRetryTest.java @@ -172,6 +172,8 @@ public void testCommitAborted() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // verify that the there is no test record try (ResultSet rs = connection.executeQuery(Statement.of("SELECT COUNT(*) AS C FROM TEST WHERE ID=1"))) { @@ -216,6 +218,8 @@ public void testInsertAborted() { assertThat(rs.getLong("C"), is(equalTo(0L))); assertThat(rs.next(), is(false)); } + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // indicate that the next statement should abort interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); @@ -241,6 +245,8 @@ public void testUpdateAborted() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // verify that the there is no test record try (ResultSet rs = connection.executeQuery(Statement.of("SELECT COUNT(*) AS C FROM TEST WHERE ID=1"))) { @@ -284,6 +290,8 @@ public void testQueryAborted() { assertThat(rs.getLong("C"), is(equalTo(0L))); assertThat(rs.next(), is(false)); } + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // insert a test record connection.executeUpdate( Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test aborted')")); @@ -321,6 +329,8 @@ public void testNextCallAborted() { connection.executeUpdate(Statement.of("INSERT INTO TEST (ID, NAME) VALUES (2, 'test 2')")); // do a query try (ResultSet rs = connection.executeQuery(Statement.of("SELECT * FROM TEST ORDER BY ID"))) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // the first record should be accessible without any problems assertThat(rs.next(), is(true)); assertThat(rs.getLong("ID"), is(equalTo(1L))); @@ -358,6 +368,8 @@ public void testMultipleAborts() { assertThat(rs.getLong("C"), is(equalTo(0L))); assertThat(rs.next(), is(false)); } + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // do three inserts which all will abort and retry interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); @@ -405,6 +417,8 @@ public void testAbortAfterSelect() { assertThat(rs.getString("NAME"), is(equalTo("test 1"))); assertThat(rs.next(), is(false)); } + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // do another insert that will abort and retry interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); @@ -439,6 +453,8 @@ public void testAbortWithResultSetHalfway() { // iterate one step assertThat(rs.next(), is(true)); assertThat(rs.getLong("ID"), is(equalTo(1L))); + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // do another insert that will abort and retry interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); @@ -475,6 +491,8 @@ public void testAbortWithResultSetFullyConsumed() { // do nothing, just consume the result set } } + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // do another insert that will abort and retry interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); @@ -512,6 +530,8 @@ public void testAbortWithConcurrentInsert() { } // now try to do an insert that will abort. The retry should now fail as there has been a // concurrent modification + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); boolean expectedException = false; @@ -551,6 +571,8 @@ public void testAbortWithConcurrentDelete() { } // now try to do an insert that will abort. The retry should now fail as there has been a // concurrent modification + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); boolean expectedException = false; @@ -590,6 +612,8 @@ public void testAbortWithConcurrentUpdate() { } // now try to do an insert that will abort. The retry should now fail as there has been a // concurrent modification + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); boolean expectedException = false; @@ -629,6 +653,8 @@ public void testAbortWithUnseenConcurrentInsert() { connection2.commit(); } // now try to do an insert that will abort. The retry should still succeed. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); int currentRetryCount = RETRY_STATISTICS.totalRetryAttemptsStarted; @@ -714,6 +740,8 @@ private int testAbortWithUnseenConcurrentInsertAbortOnNext(int callsToNext) // First verify that the transaction has not yet retried. int currentRetryCount = RETRY_STATISTICS.totalRetryAttemptsStarted; + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); @@ -760,6 +788,8 @@ public void testAbortWithConcurrentInsertAndContinue() { } // Now try to do an insert that will abort. The retry should now fail as there has been a // concurrent modification. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); boolean expectedException = false; @@ -807,6 +837,8 @@ protected boolean shouldAbort(String statement, ExecutionStep step) { }; try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); connection.executeUpdate( Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test aborted')")); connection.commit(); @@ -852,6 +884,8 @@ protected boolean shouldAbort(String statement, ExecutionStep step) { }; try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); connection.executeUpdate( Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test aborted')")); connection.commit(); @@ -906,6 +940,8 @@ protected boolean shouldAbort(String statement, ExecutionStep step) { }; try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // Insert two test records. connection.executeUpdate(Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')")); connection.executeUpdate(Statement.of("INSERT INTO TEST (ID, NAME) VALUES (2, 'test 2')")); @@ -986,6 +1022,8 @@ protected boolean shouldAbort(String statement, ExecutionStep step) { } // Now try to do an insert that will abort. The retry should now fail as there has been a // concurrent modification. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); boolean expectedException = false; @@ -1034,6 +1072,8 @@ public void testAbortWithDifferentUpdateCount() { } // Now try to do an insert that will abort. The retry should now fail as there has been a // concurrent modification. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); boolean expectedException = false; @@ -1089,6 +1129,8 @@ public void testAbortWithExceptionOnSelect() { } } // now try to do an insert that will abort. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); connection.executeUpdate(Statement.of("INSERT INTO TEST (ID, NAME) VALUES (3, 'test 3')")); @@ -1147,6 +1189,8 @@ public void testAbortWithExceptionOnSelectAndConcurrentModification() { } // Now try to do an insert that will abort. The subsequent retry will fail as the SELECT * // FROM FOO now returns a result. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); try { @@ -1213,6 +1257,8 @@ public void testAbortWithExceptionOnInsertAndConcurrentModification() { } // Now try to do an insert that will abort. The subsequent retry will fail as the INSERT INTO // FOO now succeeds. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); try { @@ -1281,6 +1327,8 @@ public void testAbortWithDroppedTableConcurrentModification() { } // Now try to do an insert that will abort. The subsequent retry will fail as the SELECT * // FROM FOO now fails. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); try { @@ -1341,6 +1389,8 @@ public void testAbortWithInsertOnDroppedTableConcurrentModification() { } // Now try to do an insert that will abort. The subsequent retry will fail as the INSERT INTO // FOO now fails. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); try { @@ -1402,6 +1452,8 @@ public void testAbortWithCursorHalfwayDroppedTableConcurrentModification() { connection2.execute(Statement.of("DROP TABLE FOO")); } // try to continue to consume the result set, but this will now abort. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); try { @@ -1443,6 +1495,8 @@ public void testRetryLargeResultSet() { } } // Do an update that will abort and retry. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); connection.executeUpdate( @@ -1473,6 +1527,8 @@ public void testRetryHighAbortRate() { AbortInterceptor interceptor = new AbortInterceptor(0.25D); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // insert test records for (int i = 0; i < NUMBER_OF_TEST_RECORDS; i++) { connection.bufferedWrite( @@ -1539,6 +1595,8 @@ public void testAbortWithConcurrentInsertOnEmptyTable() { } // Now try to consume the result set, but the call to next() will throw an AbortedException. // The retry should still succeed. + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); int currentSuccessfulRetryCount = RETRY_STATISTICS.totalSuccessfulRetries; @@ -1563,6 +1621,8 @@ public void testAbortWithConcurrentInsertOnEmptyTable() { connection2.commit(); } // this time the abort will occur on the call to commit() + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); interceptor.setProbability(1.0); interceptor.setOnlyInjectOnce(true); boolean expectedException = false; From d547920b9200b224e16b348351f2f99984a93ad2 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 10 Feb 2025 09:30:28 +0000 Subject: [PATCH 2/4] chore(spanner): assert INVALID_ARGUMENT error code --- .../google/cloud/spanner/it/ITJsonWriteReadTest.java | 9 ++++++++- .../java/com/google/cloud/spanner/it/ITWriteTest.java | 11 +++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITJsonWriteReadTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITJsonWriteReadTest.java index e355eaa07a3..026e3649b2e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITJsonWriteReadTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITJsonWriteReadTest.java @@ -132,7 +132,14 @@ public void testWriteAndReadInvalidJsonValues() throws IOException { .to(Value.json(jsonStr)) .build()))); - assertEquals(ErrorCode.FAILED_PRECONDITION, exception.getErrorCode()); + if (env.getTestHelper() + .getOptions() + .getSessionPoolOptions() + .getUseMultiplexedSessionForRW()) { + assertEquals(ErrorCode.INVALID_ARGUMENT, exception.getErrorCode()); + } else { + assertEquals(ErrorCode.FAILED_PRECONDITION, exception.getErrorCode()); + } } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java index c5eb9284479..54eee48f4f6 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java @@ -1063,8 +1063,15 @@ public void incorrectType() { write(baseInsert().set("StringValue").to(1.234).build()); fail("Expected exception"); } catch (SpannerException ex) { - assertThat(ex.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); - assertThat(ex.getMessage()).contains("STRING"); + if (env.getTestHelper() + .getOptions() + .getSessionPoolOptions() + .getUseMultiplexedSessionForRW()) { + assertThat(ex.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + } else { + assertThat(ex.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); + assertThat(ex.getMessage()).contains("STRING"); + } } } From 4c57a82def3001e53231aa9be4fd85cef7d7246d Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 10 Feb 2025 09:31:01 +0000 Subject: [PATCH 3/4] chore(spanner): update interceptor --- .../spanner/connection/ITAbstractSpannerTest.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ITAbstractSpannerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ITAbstractSpannerTest.java index 7bf6a670d9c..988263af9f0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ITAbstractSpannerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ITAbstractSpannerTest.java @@ -22,6 +22,7 @@ import com.google.cloud.spanner.GceTestEnvConfig; import com.google.cloud.spanner.IntegrationTestEnv; import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Statement; @@ -146,6 +147,9 @@ public void intercept( if (usingMultiplexedsession) { Field stateField = cls.getDeclaredField("txnState"); stateField.setAccessible(true); + if (tx.getState() == null) { + return; + } tx.rollback(); stateField.set(tx, TransactionState.ABORTED); } else { @@ -368,4 +372,11 @@ protected boolean indexExists(Connection connection, String table, String index) } return false; } + + protected boolean isMultiplexedSessionsEnabledForRW(Spanner spanner) { + if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { + return false; + } + return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW(); + } } From 3ecd2a3616351d2f65b0051d568320ac3aa6e72a Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 10 Feb 2025 09:32:59 +0000 Subject: [PATCH 4/4] chore(spanner): revert precommit token check on mockspanner --- .../com/google/cloud/spanner/MockSpannerServiceImpl.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 35c2d553b08..3443be192e6 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -2024,14 +2024,6 @@ public void commit(CommitRequest request, StreamObserver respons return; } sessionLastUsed.put(session.getName(), Instant.now()); - if (session.getMultiplexed() - && !request.hasPrecommitToken() - && !request.hasSingleUseTransaction()) { - throw Status.INVALID_ARGUMENT - .withDescription( - "A Commit request for a read-write transaction on a multiplexed session must specify a precommit token.") - .asRuntimeException(); - } try { commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Find or start a transaction