From e13aad11fe396c70bed3ac6f3ec3ace6411b15ef Mon Sep 17 00:00:00 2001 From: lutovich Date: Sat, 28 Oct 2017 13:46:00 -0400 Subject: [PATCH 1/2] Separate connections for read/write access modes `NetworkSession` acquires connections from the connection pool for the given access mode. It can reuse an existing connection, when available. Previously session did not pay attention to access mode when reusing connections. This could result in READ connection being used for WRITE operation and vice versa. This commit fixes the problem by making session hold at most two connections at the same time - one read connection and one write connection. It will now continue using existing read/write connection for read/write operation when available. --- .../internal/LeakLoggingNetworkSession.java | 2 +- .../neo4j/driver/internal/NetworkSession.java | 93 +++++++++++++------ .../driver/internal/NetworkSessionTest.java | 44 ++++++--- .../org/neo4j/driver/v1/util/TestUtil.java | 7 +- 4 files changed, 102 insertions(+), 44 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java index 5e7b3e294f..138afffe1a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java @@ -46,7 +46,7 @@ protected void finalize() throws Throwable private void logLeakIfNeeded() { - Boolean isOpen = Futures.getBlocking( currentConnectionIsOpen() ); + Boolean isOpen = Futures.getBlocking( hasOpenConnection() ); if ( isOpen ) { logger.error( "Neo4j Session object leaked, please ensure that your application" + diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index e61ce516fe..636cd5ebb8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -65,7 +65,8 @@ public class NetworkSession implements Session private volatile Bookmark bookmark = Bookmark.empty(); private volatile CompletionStage transactionStage = completedFuture( null ); - private volatile CompletionStage connectionStage = completedFuture( null ); + private volatile CompletionStage readConnectionStage = completedFuture( null ); + private volatile CompletionStage writeConnectionStage = completedFuture( null ); private final AtomicBoolean open = new AtomicBoolean( true ); @@ -250,7 +251,7 @@ public void reset() private CompletionStage resetAsync() { - return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() ) + return releaseConnections().thenCompose( ignore -> existingTransactionOrNull() ) .thenAccept( tx -> { if ( tx != null ) @@ -266,12 +267,14 @@ public TypeSystem typeSystem() return InternalTypeSystem.TYPE_SYSTEM; } - CompletionStage currentConnectionIsOpen() + CompletionStage hasOpenConnection() + { + return hasOpenConnection( readConnectionStage ).thenCompose( readConnectionOpen -> + readConnectionOpen ? completedFuture( true ) : hasOpenConnection( writeConnectionStage ) ); + } + + private CompletionStage hasOpenConnection( CompletionStage connectionStage ) { - if ( connectionStage == null ) - { - return completedFuture( false ); - } return connectionStage.handle( ( connection, error ) -> error == null && // no acquisition error connection != null && // some connection has actually been acquired @@ -447,10 +450,28 @@ private CompletionStage beginTransactionAsync( AccessMode m private CompletionStage acquireConnection( AccessMode mode ) { - // memorize in local so same instance is transformed and used in callbacks - CompletionStage currentAsyncConnectionStage = connectionStage; + if ( mode == AccessMode.READ ) + { + CompletionStage currentConnectionStage = readConnectionStage; + readConnectionStage = acquireConnection( currentConnectionStage, mode ); + return readConnectionStage; + } + else if ( mode == AccessMode.WRITE ) + { + CompletionStage currentConnectionStage = writeConnectionStage; + writeConnectionStage = acquireConnection( currentConnectionStage, mode ); + return writeConnectionStage; + } + else + { + throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" ); + } + } - connectionStage = currentAsyncConnectionStage + private CompletionStage acquireConnection( CompletionStage currentConnectionStage, + AccessMode mode ) + { + return currentConnectionStage .exceptionally( error -> null ) // handle previous acquisition failures .thenCompose( connection -> { @@ -458,7 +479,7 @@ private CompletionStage acquireConnection( AccessMode mode ) { // previous acquisition attempt was successful and connection has not been released yet // continue using same connection - return currentAsyncConnectionStage; + return currentConnectionStage; } else { @@ -467,13 +488,11 @@ private CompletionStage acquireConnection( AccessMode mode ) return connectionProvider.acquireConnection( mode ); } } ); - - return connectionStage; } private CompletionStage releaseResources() { - return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() ); + return rollbackTransaction().thenCompose( ignore -> releaseConnections() ); } private CompletionStage rollbackTransaction() @@ -493,16 +512,41 @@ private CompletionStage rollbackTransaction() } ); } - private CompletionStage releaseConnectionNow() + private CompletionStage releaseConnections() + { + CompletableFuture result = new CompletableFuture<>(); + releaseConnection( readConnectionStage ).whenComplete( ( ignore1, readConnectionError ) -> + releaseConnection( writeConnectionStage ).whenComplete( ( ignore2, writeConnectionError ) -> + afterConnectionsReleased( result, readConnectionError, writeConnectionError ) ) ); + return result; + } + + private CompletionStage releaseConnection( CompletionStage connectionStage ) { - return existingConnectionOrNull().thenCompose( connection -> + return connectionStage.exceptionally( error -> null ) + .thenCompose( connection -> connection == null ? completedFuture( null ) : connection.releaseNow() ); + } + + private void afterConnectionsReleased( CompletableFuture result, Throwable readConnectionError, + Throwable writeConnectionError ) + { + if ( readConnectionError != null && writeConnectionError != null ) { - if ( connection != null ) - { - return connection.releaseNow(); - } - return completedFuture( null ); - } ); + readConnectionError.addSuppressed( writeConnectionError ); + result.completeExceptionally( readConnectionError ); + } + else if ( readConnectionError != null ) + { + result.completeExceptionally( readConnectionError ); + } + else if ( writeConnectionError != null ) + { + result.completeExceptionally( writeConnectionError ); + } + else + { + result.complete( null ); + } } private CompletionStage ensureNoOpenTxBeforeRunningQuery() @@ -535,11 +579,6 @@ private CompletionStage existingTransactionOrNull() .thenApply( tx -> tx != null && tx.isOpen() ? tx : null ); } - private CompletionStage existingConnectionOrNull() - { - return connectionStage.exceptionally( error -> null ); // handle previous acquisition failures - } - private void ensureSessionIsOpen() { if ( !open.get() ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 4ed7b2f19e..d98a319561 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -34,7 +34,6 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.internal.spi.ResponseHandler; -import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Session; @@ -74,6 +73,7 @@ import static org.neo4j.driver.v1.AccessMode.READ; import static org.neo4j.driver.v1.AccessMode.WRITE; import static org.neo4j.driver.v1.util.TestUtil.connectionMock; +import static org.neo4j.driver.v1.util.TestUtil.setupSuccessfulPullAll; public class NetworkSessionTest { @@ -88,8 +88,6 @@ public class NetworkSessionTest public void setUp() { connection = connectionMock(); - when( connection.releaseNow() ).thenReturn( completedFuture( null ) ); - when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); connectionProvider = mock( ConnectionProvider.class ); when( connectionProvider.acquireConnection( any( AccessMode.class ) ) ) .thenReturn( completedFuture( connection ) ); @@ -252,7 +250,7 @@ public void acquiresNewConnectionWhenUnableToUseCurrentOneForRun() public void releasesOpenConnectionUsedForRunWhenSessionIsClosed() { String query = "RETURN 1"; - setupSuccessfulPullAll( query ); + setupSuccessfulPullAll( connection, query ); session.run( query ); @@ -357,7 +355,7 @@ public void updatesBookmarkWhenTxIsClosed() public void releasesConnectionWhenTxIsClosed() { String query = "RETURN 42"; - setupSuccessfulPullAll( query ); + setupSuccessfulPullAll( connection, query ); Transaction tx = session.beginTransaction(); tx.run( query ); @@ -755,6 +753,32 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() verifyBeginTx( connection, times( 1 ) ); } + @Test + public void shouldNotUseReadConnectionForWriteAccessMode() + { + String readQuery = "RETURN 1"; + String writeQuery = "CREATE ()"; + + Connection readConnection = connectionMock(); + Connection writeConnection = connectionMock(); + when( readConnection.tryMarkInUse() ).thenReturn( true ); // allow read connection to always be used + when( writeConnection.tryMarkInUse() ).thenReturn( true ); // allow write connection to always be used + setupSuccessfulPullAll( readConnection, readQuery ); + setupSuccessfulPullAll( writeConnection, writeQuery ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( completedFuture( readConnection ) ); + when( connectionProvider.acquireConnection( WRITE ) ).thenReturn( completedFuture( writeConnection ) ); + + session.run( readQuery ); // query is executed with default READ access mode + session.writeTransaction( tx -> tx.run( writeQuery ) ); // write tx is executed with WRITE access mode + + // auto-commit query and write tx should've used different connections + verifyRunAndFlush( readConnection, writeQuery, never() ); + verifyRunAndFlush( writeConnection, readQuery, never() ); + + verifyRunAndFlush( readConnection, readQuery, times( 1 ) ); + verifyRunAndFlush( writeConnection, writeQuery, times( 1 ) ); + } + private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode ) { NetworkSession session = newSession( connectionProvider, sessionMode ); @@ -1030,16 +1054,6 @@ private static void setupFailingBegin( Connection connection, Throwable error ) } ).when( connection ).runAndFlush( eq( "BEGIN" ), any(), any(), any() ); } - private void setupSuccessfulPullAll( String query ) - { - doAnswer( invocation -> - { - ResponseHandler pullAllHandler = invocation.getArgumentAt( 3, ResponseHandler.class ); - pullAllHandler.onSuccess( emptyMap() ); - return null; - } ).when( connection ).runAndFlush( eq( query ), eq( emptyMap() ), any(), any() ); - } - private static class TxWork implements TransactionWork { final int result; diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index a78038600d..d1082cf654 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -33,11 +33,13 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; import static java.util.Collections.emptyMap; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.TimeUnit.MINUTES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -45,6 +47,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public final class TestUtil { @@ -157,10 +160,12 @@ public static Connection connectionMock() setupSuccessfulPullAll( connection, "COMMIT" ); setupSuccessfulPullAll( connection, "ROLLBACK" ); setupSuccessfulPullAll( connection, "BEGIN" ); + when( connection.releaseNow() ).thenReturn( completedFuture( null ) ); + when( connection.serverVersion() ).thenReturn( ServerVersion.vInDev ); return connection; } - private static void setupSuccessfulPullAll( Connection connection, String statement ) + public static void setupSuccessfulPullAll( Connection connection, String statement ) { doAnswer( invocation -> { From 50fc6f99f4ce60c05e0934ab8c85adeff0155b4f Mon Sep 17 00:00:00 2001 From: lutovich Date: Sat, 28 Oct 2017 13:59:58 -0400 Subject: [PATCH 2/2] Session#reset() releases connection last Previously it tried to first release connection and them mark existing transaction as terminated. This could've been problematic because it allowed transaction to be used on the client after RESET, which clears connection state on the server. This commit makes `Session#reset()` first mark existing transaction as terminated and only then release the connection. Releasing connection implies RESET. --- .../neo4j/driver/internal/NetworkSession.java | 15 +++++++-------- .../driver/internal/NetworkSessionTest.java | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 636cd5ebb8..0a6030195a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -251,14 +251,13 @@ public void reset() private CompletionStage resetAsync() { - return releaseConnections().thenCompose( ignore -> existingTransactionOrNull() ) - .thenAccept( tx -> - { - if ( tx != null ) - { - tx.markTerminated(); - } - } ); + return existingTransactionOrNull().thenAccept( tx -> + { + if ( tx != null ) + { + tx.markTerminated(); + } + } ).thenCompose( ignore -> releaseConnections() ); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index d98a319561..2baa3d17a0 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -601,6 +601,25 @@ public void transactionShouldBeRolledBackAfterSessionReset() assertFalse( tx.isOpen() ); } + @Test + public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset() + { + NetworkSession session = newSession( connectionProvider, READ ); + Transaction tx = session.beginTransaction(); + + assertTrue( tx.isOpen() ); + when( connection.releaseNow() ).then( invocation -> + { + // verify that tx is not open when connection is released + assertFalse( tx.isOpen() ); + return completedFuture( null ); + } ); + + session.reset(); + + verify( connection ).releaseNow(); + } + @Test public void shouldHaveNullLastBookmarkInitially() {