diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java index 5e7b3e294f..138afffe1a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java @@ -46,7 +46,7 @@ protected void finalize() throws Throwable private void logLeakIfNeeded() { - Boolean isOpen = Futures.getBlocking( currentConnectionIsOpen() ); + Boolean isOpen = Futures.getBlocking( hasOpenConnection() ); if ( isOpen ) { logger.error( "Neo4j Session object leaked, please ensure that your application" + diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index e61ce516fe..0a6030195a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -65,7 +65,8 @@ public class NetworkSession implements Session private volatile Bookmark bookmark = Bookmark.empty(); private volatile CompletionStage transactionStage = completedFuture( null ); - private volatile CompletionStage connectionStage = completedFuture( null ); + private volatile CompletionStage readConnectionStage = completedFuture( null ); + private volatile CompletionStage writeConnectionStage = completedFuture( null ); private final AtomicBoolean open = new AtomicBoolean( true ); @@ -250,14 +251,13 @@ public void reset() private CompletionStage resetAsync() { - return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() ) - .thenAccept( tx -> - { - if ( tx != null ) - { - tx.markTerminated(); - } - } ); + return existingTransactionOrNull().thenAccept( tx -> + { + if ( tx != null ) + { + tx.markTerminated(); + } + } ).thenCompose( ignore -> releaseConnections() ); } @Override @@ -266,12 +266,14 @@ public TypeSystem typeSystem() return InternalTypeSystem.TYPE_SYSTEM; } - CompletionStage currentConnectionIsOpen() + CompletionStage hasOpenConnection() + { + return hasOpenConnection( readConnectionStage ).thenCompose( readConnectionOpen -> + readConnectionOpen ? completedFuture( true ) : hasOpenConnection( writeConnectionStage ) ); + } + + private CompletionStage hasOpenConnection( CompletionStage connectionStage ) { - if ( connectionStage == null ) - { - return completedFuture( false ); - } return connectionStage.handle( ( connection, error ) -> error == null && // no acquisition error connection != null && // some connection has actually been acquired @@ -447,10 +449,28 @@ private CompletionStage beginTransactionAsync( AccessMode m private CompletionStage acquireConnection( AccessMode mode ) { - // memorize in local so same instance is transformed and used in callbacks - CompletionStage currentAsyncConnectionStage = connectionStage; + if ( mode == AccessMode.READ ) + { + CompletionStage currentConnectionStage = readConnectionStage; + readConnectionStage = acquireConnection( currentConnectionStage, mode ); + return readConnectionStage; + } + else if ( mode == AccessMode.WRITE ) + { + CompletionStage currentConnectionStage = writeConnectionStage; + writeConnectionStage = acquireConnection( currentConnectionStage, mode ); + return writeConnectionStage; + } + else + { + throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" ); + } + } - connectionStage = currentAsyncConnectionStage + private CompletionStage acquireConnection( CompletionStage currentConnectionStage, + AccessMode mode ) + { + return currentConnectionStage .exceptionally( error -> null ) // handle previous acquisition failures .thenCompose( connection -> { @@ -458,7 +478,7 @@ private CompletionStage acquireConnection( AccessMode mode ) { // previous acquisition attempt was successful and connection has not been released yet // continue using same connection - return currentAsyncConnectionStage; + return currentConnectionStage; } else { @@ -467,13 +487,11 @@ private CompletionStage acquireConnection( AccessMode mode ) return connectionProvider.acquireConnection( mode ); } } ); - - return connectionStage; } private CompletionStage releaseResources() { - return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() ); + return rollbackTransaction().thenCompose( ignore -> releaseConnections() ); } private CompletionStage rollbackTransaction() @@ -493,16 +511,41 @@ private CompletionStage rollbackTransaction() } ); } - private CompletionStage releaseConnectionNow() + private CompletionStage releaseConnections() + { + CompletableFuture result = new CompletableFuture<>(); + releaseConnection( readConnectionStage ).whenComplete( ( ignore1, readConnectionError ) -> + releaseConnection( writeConnectionStage ).whenComplete( ( ignore2, writeConnectionError ) -> + afterConnectionsReleased( result, readConnectionError, writeConnectionError ) ) ); + return result; + } + + private CompletionStage releaseConnection( CompletionStage connectionStage ) { - return existingConnectionOrNull().thenCompose( connection -> + return connectionStage.exceptionally( error -> null ) + .thenCompose( connection -> connection == null ? completedFuture( null ) : connection.releaseNow() ); + } + + private void afterConnectionsReleased( CompletableFuture result, Throwable readConnectionError, + Throwable writeConnectionError ) + { + if ( readConnectionError != null && writeConnectionError != null ) { - if ( connection != null ) - { - return connection.releaseNow(); - } - return completedFuture( null ); - } ); + readConnectionError.addSuppressed( writeConnectionError ); + result.completeExceptionally( readConnectionError ); + } + else if ( readConnectionError != null ) + { + result.completeExceptionally( readConnectionError ); + } + else if ( writeConnectionError != null ) + { + result.completeExceptionally( writeConnectionError ); + } + else + { + result.complete( null ); + } } private CompletionStage ensureNoOpenTxBeforeRunningQuery() @@ -535,11 +578,6 @@ private CompletionStage existingTransactionOrNull() .thenApply( tx -> tx != null && tx.isOpen() ? tx : null ); } - private CompletionStage existingConnectionOrNull() - { - return connectionStage.exceptionally( error -> null ); // handle previous acquisition failures - } - private void ensureSessionIsOpen() { if ( !open.get() ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 4ed7b2f19e..2baa3d17a0 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -34,7 +34,6 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.internal.spi.ResponseHandler; -import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Session; @@ -74,6 +73,7 @@ import static org.neo4j.driver.v1.AccessMode.READ; import static org.neo4j.driver.v1.AccessMode.WRITE; import static org.neo4j.driver.v1.util.TestUtil.connectionMock; +import static org.neo4j.driver.v1.util.TestUtil.setupSuccessfulPullAll; public class NetworkSessionTest { @@ -88,8 +88,6 @@ public class NetworkSessionTest public void setUp() { connection = connectionMock(); - when( connection.releaseNow() ).thenReturn( completedFuture( null ) ); - when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); connectionProvider = mock( ConnectionProvider.class ); when( connectionProvider.acquireConnection( any( AccessMode.class ) ) ) .thenReturn( completedFuture( connection ) ); @@ -252,7 +250,7 @@ public void acquiresNewConnectionWhenUnableToUseCurrentOneForRun() public void releasesOpenConnectionUsedForRunWhenSessionIsClosed() { String query = "RETURN 1"; - setupSuccessfulPullAll( query ); + setupSuccessfulPullAll( connection, query ); session.run( query ); @@ -357,7 +355,7 @@ public void updatesBookmarkWhenTxIsClosed() public void releasesConnectionWhenTxIsClosed() { String query = "RETURN 42"; - setupSuccessfulPullAll( query ); + setupSuccessfulPullAll( connection, query ); Transaction tx = session.beginTransaction(); tx.run( query ); @@ -603,6 +601,25 @@ public void transactionShouldBeRolledBackAfterSessionReset() assertFalse( tx.isOpen() ); } + @Test + public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset() + { + NetworkSession session = newSession( connectionProvider, READ ); + Transaction tx = session.beginTransaction(); + + assertTrue( tx.isOpen() ); + when( connection.releaseNow() ).then( invocation -> + { + // verify that tx is not open when connection is released + assertFalse( tx.isOpen() ); + return completedFuture( null ); + } ); + + session.reset(); + + verify( connection ).releaseNow(); + } + @Test public void shouldHaveNullLastBookmarkInitially() { @@ -755,6 +772,32 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() verifyBeginTx( connection, times( 1 ) ); } + @Test + public void shouldNotUseReadConnectionForWriteAccessMode() + { + String readQuery = "RETURN 1"; + String writeQuery = "CREATE ()"; + + Connection readConnection = connectionMock(); + Connection writeConnection = connectionMock(); + when( readConnection.tryMarkInUse() ).thenReturn( true ); // allow read connection to always be used + when( writeConnection.tryMarkInUse() ).thenReturn( true ); // allow write connection to always be used + setupSuccessfulPullAll( readConnection, readQuery ); + setupSuccessfulPullAll( writeConnection, writeQuery ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( completedFuture( readConnection ) ); + when( connectionProvider.acquireConnection( WRITE ) ).thenReturn( completedFuture( writeConnection ) ); + + session.run( readQuery ); // query is executed with default READ access mode + session.writeTransaction( tx -> tx.run( writeQuery ) ); // write tx is executed with WRITE access mode + + // auto-commit query and write tx should've used different connections + verifyRunAndFlush( readConnection, writeQuery, never() ); + verifyRunAndFlush( writeConnection, readQuery, never() ); + + verifyRunAndFlush( readConnection, readQuery, times( 1 ) ); + verifyRunAndFlush( writeConnection, writeQuery, times( 1 ) ); + } + private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode ) { NetworkSession session = newSession( connectionProvider, sessionMode ); @@ -1030,16 +1073,6 @@ private static void setupFailingBegin( Connection connection, Throwable error ) } ).when( connection ).runAndFlush( eq( "BEGIN" ), any(), any(), any() ); } - private void setupSuccessfulPullAll( String query ) - { - doAnswer( invocation -> - { - ResponseHandler pullAllHandler = invocation.getArgumentAt( 3, ResponseHandler.class ); - pullAllHandler.onSuccess( emptyMap() ); - return null; - } ).when( connection ).runAndFlush( eq( query ), eq( emptyMap() ), any(), any() ); - } - private static class TxWork implements TransactionWork { final int result; diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index a78038600d..d1082cf654 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -33,11 +33,13 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; import static java.util.Collections.emptyMap; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.TimeUnit.MINUTES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -45,6 +47,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public final class TestUtil { @@ -157,10 +160,12 @@ public static Connection connectionMock() setupSuccessfulPullAll( connection, "COMMIT" ); setupSuccessfulPullAll( connection, "ROLLBACK" ); setupSuccessfulPullAll( connection, "BEGIN" ); + when( connection.releaseNow() ).thenReturn( completedFuture( null ) ); + when( connection.serverVersion() ).thenReturn( ServerVersion.vInDev ); return connection; } - private static void setupSuccessfulPullAll( Connection connection, String statement ) + public static void setupSuccessfulPullAll( Connection connection, String statement ) { doAnswer( invocation -> {