From b40e53a452d561c0fba0b7426e58b2c1fa65b241 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 22 Mar 2017 17:01:53 +0100 Subject: [PATCH] Fix reset of transaction functions Transaction functions (`Session#readTransaction()` and `Session#writeTransaction()`) should respect `Session#reset()` call the same way other statement running operations do. They shouldn't continue retrying after `#reset()`. This commit makes transaction functions responsive to reset by removing excessive synchronization and adding status code checks so retries do not happen when transaction is explicitly terminated. --- .../neo4j/driver/internal/NetworkSession.java | 2 +- .../internal/retry/ExponentialBackoff.java | 21 +- .../retry/ExponentialBackoffTest.java | 26 ++ .../driver/v1/integration/SessionIT.java | 416 ++++++++++++++++++ 4 files changed, 463 insertions(+), 2 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index dbb426ac03..d653fb3c33 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -244,7 +244,7 @@ public synchronized void onConnectionError( boolean recoverable ) } } - private synchronized T transaction( AccessMode mode, TransactionWork work ) + private T transaction( AccessMode mode, TransactionWork work ) { RetryDecision decision = null; List errors = null; diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoff.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoff.java index 7cf3d9067c..7bd1911031 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoff.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoff.java @@ -150,6 +150,25 @@ private static boolean canRetryOn( Throwable error ) { return error instanceof SessionExpiredException || error instanceof ServiceUnavailableException || - error instanceof TransientException; + isTransientError( error ); + } + + private static boolean isTransientError( Throwable error ) + { + if ( error instanceof TransientException ) + { + String code = ((TransientException) error).code(); + // Retries should not happen when transaction was explicitly terminated by the user. + // Termination of transaction might result in two different error codes depending on where it was + // terminated. These are really client errors but classification on the server is not entirely correct and + // they are classified as transient. + if ( "Neo.TransientError.Transaction.Terminated".equals( code ) || + "Neo.TransientError.Transaction.LockClientStopped".equals( code ) ) + { + return false; + } + return true; + } + return false; } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffTest.java b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffTest.java index 23d9b0c115..65a7c06be8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffTest.java @@ -253,6 +253,32 @@ public void sleepsOnTransientException() throws Exception verify( clock ).sleep( 1 ); } + @Test + public void doesNothingWhenTransactionTerminatedError() throws Exception + { + Clock clock = mock( Clock.class ); + ExponentialBackoff backoff = newBackoff( 1, 1, 1, 0, clock ); + + TransientException exception = new TransientException( "Neo.TransientError.Transaction.Terminated", "" ); + ExponentialBackoffDecision decision = backoff.apply( exception, null ); + + assertFalse( decision.shouldRetry() ); + verify( clock, never() ).sleep( anyLong() ); + } + + @Test + public void doesNothingWhenTransactionLockClientStoppedError() throws Exception + { + Clock clock = mock( Clock.class ); + ExponentialBackoff backoff = newBackoff( 1, 1, 1, 0, clock ); + + TransientException exception = new TransientException( "Neo.TransientError.Transaction.LockClientStopped", "" ); + ExponentialBackoffDecision decision = backoff.apply( exception, null ); + + assertFalse( decision.shouldRetry() ); + verify( clock, never() ).sleep( anyLong() ); + } + @Test public void throwsWhenSleepInterrupted() throws Exception { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index f093670d69..6fb0903970 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -26,6 +26,14 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.cluster.RoutingSettings; @@ -39,15 +47,19 @@ import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.StatementRunner; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.TransientException; +import org.neo4j.driver.v1.util.DaemonThreadFactory; import org.neo4j.driver.v1.util.ServerVersion; import org.neo4j.driver.v1.util.TestNeo4j; import static java.lang.String.format; +import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -849,6 +861,263 @@ public Integer execute( Transaction tx ) } } + @Test( timeout = 20_000 ) + public void resetShouldStopQueryWaitingForALock() throws Exception + { + testResetOfQueryWaitingForLock( new NodeIdUpdater() + { + @Override + void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception + { + try ( Session session = driver.session() ) + { + usedSessionRef.set( session ); + latchToWait.await(); + StatementResult result = updateNodeId( session, nodeId, newNodeId ); + result.consume(); + } + } + } ); + } + + @Test( timeout = 20_000 ) + public void resetShouldStopTransactionWaitingForALock() throws Exception + { + testResetOfQueryWaitingForLock( new NodeIdUpdater() + { + @Override + public void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception + { + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) + { + usedSessionRef.set( session ); + latchToWait.await(); + StatementResult result = updateNodeId( tx, nodeId, newNodeId ); + result.consume(); + } + } + } ); + } + + @Test( timeout = 20_000 ) + public void resetShouldStopWriteTransactionWaitingForALock() throws Exception + { + final AtomicInteger invocationsOfWork = new AtomicInteger(); + + testResetOfQueryWaitingForLock( new NodeIdUpdater() + { + @Override + public void performUpdate( Driver driver, final int nodeId, final int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception + { + try ( Session session = driver.session() ) + { + usedSessionRef.set( session ); + latchToWait.await(); + + session.writeTransaction( new TransactionWork() + { + @Override + public Void execute( Transaction tx ) + { + invocationsOfWork.incrementAndGet(); + StatementResult result = updateNodeId( tx, nodeId, newNodeId ); + result.consume(); + return null; + } + } ); + } + } + } ); + + assertEquals( 1, invocationsOfWork.get() ); + } + + @Test( timeout = 20_000 ) + public void transactionRunShouldFailOnDeadlocks() throws Exception + { + final int nodeId1 = 42; + final int nodeId2 = 4242; + final int newNodeId1 = 1; + final int newNodeId2 = 2; + + createNodeWithId( nodeId1 ); + createNodeWithId( nodeId2 ); + + final CountDownLatch latch1 = new CountDownLatch( 1 ); + final CountDownLatch latch2 = new CountDownLatch( 1 ); + + try ( final Driver driver = GraphDatabase.driver( neo4j.uri() ) ) + { + Future result1 = executeInDifferentThread( new Callable() + { + @Override + public Void call() throws Exception + { + try ( Session session = driver.session(); + Transaction tx = session.beginTransaction() ) + { + // lock first node + updateNodeId( tx, nodeId1, newNodeId1 ).consume(); + + latch1.await(); + latch2.countDown(); + + // lock second node + updateNodeId( tx, nodeId2, newNodeId1 ).consume(); + + tx.success(); + } + return null; + } + } ); + + Future result2 = executeInDifferentThread( new Callable() + { + @Override + public Void call() throws Exception + { + try ( Session session = driver.session(); + Transaction tx = session.beginTransaction() ) + { + // lock second node + updateNodeId( tx, nodeId2, newNodeId2 ).consume(); + + latch1.countDown(); + latch2.await(); + + // lock first node + updateNodeId( tx, nodeId1, newNodeId2 ).consume(); + + tx.success(); + } + return null; + } + } ); + + boolean firstResultFailed = assertOneOfTwoFuturesFailWithDeadlock( result1, result2 ); + if ( firstResultFailed ) + { + assertEquals( 0, countNodesWithId( newNodeId1 ) ); + assertEquals( 2, countNodesWithId( newNodeId2 ) ); + } + else + { + assertEquals( 2, countNodesWithId( newNodeId1 ) ); + assertEquals( 0, countNodesWithId( newNodeId2 ) ); + } + } + } + + @Test( timeout = 20_000 ) + public void writeTransactionFunctionShouldRetryDeadlocks() throws Exception + { + final int nodeId1 = 42; + final int nodeId2 = 4242; + final int nodeId3 = 424242; + final int newNodeId1 = 1; + final int newNodeId2 = 2; + + createNodeWithId( nodeId1 ); + createNodeWithId( nodeId2 ); + + final CountDownLatch latch1 = new CountDownLatch( 1 ); + final CountDownLatch latch2 = new CountDownLatch( 1 ); + + try ( final Driver driver = GraphDatabase.driver( neo4j.uri() ) ) + { + Future result1 = executeInDifferentThread( new Callable() + { + @Override + public Void call() throws Exception + { + try ( Session session = driver.session(); + Transaction tx = session.beginTransaction() ) + { + // lock first node + updateNodeId( tx, nodeId1, newNodeId1 ).consume(); + + latch1.await(); + latch2.countDown(); + + // lock second node + updateNodeId( tx, nodeId2, newNodeId1 ).consume(); + + tx.success(); + } + return null; + } + } ); + + Future result2 = executeInDifferentThread( new Callable() + { + @Override + public Void call() throws Exception + { + try ( Session session = driver.session() ) + { + session.writeTransaction( new TransactionWork() + { + @Override + public Void execute( Transaction tx ) + { + // lock second node + updateNodeId( tx, nodeId2, newNodeId2 ).consume(); + + latch1.countDown(); + await( latch2 ); + + // lock first node + updateNodeId( tx, nodeId1, newNodeId2 ).consume(); + + createNodeWithId( nodeId3 ); + + return null; + } + } ); + } + return null; + } + } ); + + boolean firstResultFailed = false; + try + { + // first future may: + // 1) succeed, when it's tx was able to grab both locks and tx in other future was + // terminated because of a deadlock + // 2) fail, when it's tx was terminated because of a deadlock + assertNull( result1.get( 20, TimeUnit.SECONDS ) ); + } + catch ( ExecutionException e ) + { + firstResultFailed = true; + } + + // second future can't fail because deadlocks are retried + assertNull( result2.get( 20, TimeUnit.SECONDS ) ); + + if ( firstResultFailed ) + { + // tx with retries was successful and updated ids + assertEquals( 0, countNodesWithId( newNodeId1 ) ); + assertEquals( 2, countNodesWithId( newNodeId2 ) ); + } + else + { + // tx without retries was successful and updated ids + // tx with retries did not manage to find nodes because their ids were updated + assertEquals( 2, countNodesWithId( newNodeId1 ) ); + assertEquals( 0, countNodesWithId( newNodeId2 ) ); + } + // tx with retries was successful and created an additional node + assertEquals( 1, countNodesWithId( nodeId3 ) ); + } + } + private void testExecuteReadTx( AccessMode sessionMode ) { Driver driver = neo4j.driver(); @@ -973,6 +1242,153 @@ private static void assumeBookmarkSupport( Driver driver ) serverVersion.greaterThanOrEqual( v3_1_0 ) ); } + @SuppressWarnings( "deprecation" ) + private void testResetOfQueryWaitingForLock( NodeIdUpdater nodeIdUpdater ) throws Exception + { + int nodeId = 42; + int newNodeId1 = 4242; + int newNodeId2 = 424242; + + createNodeWithId( nodeId ); + + CountDownLatch nodeLocked = new CountDownLatch( 1 ); + AtomicReference otherSessionRef = new AtomicReference<>(); + + try ( Driver driver = GraphDatabase.driver( neo4j.uri() ); + Session session = driver.session(); + Transaction tx = session.beginTransaction() ) + { + Future txResult = nodeIdUpdater.update( driver, nodeId, newNodeId1, otherSessionRef, nodeLocked ); + + StatementResult result = updateNodeId( tx, nodeId, newNodeId2 ); + result.consume(); + tx.success(); + + nodeLocked.countDown(); + // give separate thread some time to block on a lock + Thread.sleep( 2_000 ); + otherSessionRef.get().reset(); + + assertTransactionTerminated( txResult ); + } + + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "MATCH (n) RETURN n.id AS id" ); + int value = result.single().get( "id" ).asInt(); + assertEquals( newNodeId2, value ); + } + } + + private int countNodesWithId( int id ) + { + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "MATCH (n {id: {id}}) RETURN count(n)", parameters( "id", id ) ); + return result.single().get( 0 ).asInt(); + } + } + + private void createNodeWithId( int id ) + { + try ( Session session = neo4j.driver().session() ) + { + session.run( "CREATE (n {id: {id}})", parameters( "id", id ) ); + } + } + + private static StatementResult updateNodeId( StatementRunner statementRunner, int currentId, int newId ) + { + return statementRunner.run( "MATCH (n {id: {currentId}}) SET n.id = {newId}", + parameters( "currentId", currentId, "newId", newId ) ); + } + + private static void assertTransactionTerminated( Future work ) throws Exception + { + try + { + work.get( 20, TimeUnit.SECONDS ); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertThat( e.getCause(), instanceOf( TransientException.class ) ); + assertThat( e.getCause().getMessage(), startsWith( "The transaction has been terminated" ) ); + } + } + + private static boolean assertOneOfTwoFuturesFailWithDeadlock( Future future1, Future future2 ) + throws Exception + { + boolean firstFailed = false; + try + { + assertNull( future1.get( 20, TimeUnit.SECONDS ) ); + } + catch ( ExecutionException e ) + { + assertDeadlockDetectedError( e ); + firstFailed = true; + } + + try + { + assertNull( future2.get( 20, TimeUnit.SECONDS ) ); + } + catch ( ExecutionException e ) + { + assertFalse( "Both futures failed, ", firstFailed ); + assertDeadlockDetectedError( e ); + } + return firstFailed; + } + + private static void assertDeadlockDetectedError( ExecutionException e ) + { + assertThat( e.getCause(), instanceOf( TransientException.class ) ); + String errorCode = ((TransientException) e.getCause()).code(); + assertEquals( "Neo.TransientError.Transaction.DeadlockDetected", errorCode ); + } + + private static Future executeInDifferentThread( Callable callable ) + { + ExecutorService executor = newSingleThreadExecutor( new DaemonThreadFactory( "test-thread-" ) ); + return executor.submit( callable ); + } + + private static void await( CountDownLatch latch ) + { + try + { + latch.await(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + throw new RuntimeException( e ); + } + } + + private static abstract class NodeIdUpdater + { + final Future update( final Driver driver, final int nodeId, final int newNodeId, + final AtomicReference usedSessionRef, final CountDownLatch latchToWait ) + { + return executeInDifferentThread( new Callable() + { + @Override + public Void call() throws Exception + { + performUpdate( driver, nodeId, newNodeId, usedSessionRef, latchToWait ); + return null; + } + } ); + } + + abstract void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception; + } + private static class ThrowingWork implements TransactionWork { final String query;