From f71b68f5e20666542591b54255dd1be23da22570 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 13 Nov 2017 11:36:17 +0100 Subject: [PATCH 1/2] Session#reset() releases connection last Previously it tried to first release connection and then mark existing transaction as terminated. This could've been problematic because it allowed transaction to be used on the client after RESET, which clears connection state on the server. This commit makes `Session#reset()` first mark existing transaction as terminated and only then release the connection. Releasing connection implies RESET. --- .../neo4j/driver/internal/NetworkSession.java | 15 +++++++-------- .../driver/internal/NetworkSessionTest.java | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index b40b1dd785..bfbcee4e4d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -254,14 +254,13 @@ public void reset() private CompletionStage resetAsync() { - return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() ) - .thenAccept( tx -> - { - if ( tx != null ) - { - tx.markTerminated(); - } - } ); + return existingTransactionOrNull().thenAccept( tx -> + { + if ( tx != null ) + { + tx.markTerminated(); + } + } ).thenCompose( ignore -> releaseConnectionNow() ); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 5ea0f25731..a27801ff4d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -663,6 +663,25 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() verifyBeginTx( connection, times( 1 ) ); } + @Test + public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset() + { + NetworkSession session = newSession( connectionProvider, READ ); + Transaction tx = session.beginTransaction(); + + assertTrue( tx.isOpen() ); + when( connection.releaseNow() ).then( invocation -> + { + // verify that tx is not open when connection is released + assertFalse( tx.isOpen() ); + return completedFuture( null ); + } ); + + session.reset(); + + verify( connection ).releaseNow(); + } + private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode ) { NetworkSession session = newSession( connectionProvider, sessionMode ); From 5a174ebb34f445fb4ff050d1f4717133cdfe3176 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 13 Nov 2017 14:16:57 +0100 Subject: [PATCH 2/2] Simplified releasing of connections to the pool Replaced `Connection#releaseInBackground()` and `Connection#releaseNow()` with single `Connection#release()`. Two old methods only differed in return type. New method returns a `CompletionStage` which can either be ignored or chained, if needed. --- .../internal/DirectConnectionProvider.java | 2 +- .../driver/internal/ExplicitTransaction.java | 4 +- .../neo4j/driver/internal/NetworkSession.java | 8 +- .../internal/async/NettyConnection.java | 35 ++--- .../internal/async/RoutingConnection.java | 10 +- .../handlers/ResetResponseHandler.java | 6 - .../SessionPullAllResponseHandler.java | 9 +- .../neo4j/driver/internal/spi/Connection.java | 4 +- .../internal/ExplicitTransactionTest.java | 10 +- .../driver/internal/NetworkSessionTest.java | 19 ++- .../internal/async/NettyConnectionTest.java | 123 +++++++++++++++++- .../async/pool/ConnectionPoolImplTest.java | 4 +- .../handlers/ResetResponseHandlerTest.java | 29 +++-- .../SessionPullAllResponseHandlerTest.java | 4 +- .../v1/integration/ConnectionHandlingIT.java | 39 +++--- 15 files changed, 211 insertions(+), 95 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java index bd2de5282e..50eceaa612 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java @@ -52,7 +52,7 @@ public CompletionStage acquireConnection( AccessMode mode ) @Override public CompletionStage verifyConnectivity() { - return acquireConnection( READ ).thenCompose( Connection::releaseNow ); + return acquireConnection( READ ).thenCompose( Connection::release ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 5bb8641208..a9d2548c17 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -122,7 +122,7 @@ public CompletionStage beginAsync( Bookmark initialBookmark if ( beginError != null ) { // release connection if begin failed, transaction can't be started - connection.releaseNow(); + connection.release(); throw new CompletionException( Futures.completionErrorCause( beginError ) ); } return tx; @@ -397,7 +397,7 @@ private BiConsumer transactionClosed( State newState ) return ( ignore, error ) -> { state = newState; - connection.releaseInBackground(); + connection.release(); // release in background session.setBookmark( bookmark ); }; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index bfbcee4e4d..f40fa53819 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -260,7 +260,7 @@ private CompletionStage resetAsync() { tx.markTerminated(); } - } ).thenCompose( ignore -> releaseConnectionNow() ); + } ).thenCompose( ignore -> releaseConnection() ); } @Override @@ -495,7 +495,7 @@ private CompletionStage acquireConnection( AccessMode mode ) private CompletionStage releaseResources() { - return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() ); + return rollbackTransaction().thenCompose( ignore -> releaseConnection() ); } private CompletionStage rollbackTransaction() @@ -515,13 +515,13 @@ private CompletionStage rollbackTransaction() } ); } - private CompletionStage releaseConnectionNow() + private CompletionStage releaseConnection() { return existingConnectionOrNull().thenCompose( connection -> { if ( connection != null ) { - return connection.releaseNow(); + return connection.release(); } return completedFuture( null ); } ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java index 1447897bca..9532de17e6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java @@ -39,14 +39,14 @@ import org.neo4j.driver.v1.Value; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher; import static org.neo4j.driver.internal.util.Futures.asCompletionStage; -// todo: keep state flags to prohibit interaction with released connections public class NettyConnection implements Connection { private final Channel channel; private final InboundMessageDispatcher messageDispatcher; + private final BoltServerAddress serverAddress; + private final ServerVersion serverVersion; private final ChannelPool channelPool; private final Clock clock; @@ -56,7 +56,9 @@ public class NettyConnection implements Connection public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock ) { this.channel = channel; - this.messageDispatcher = messageDispatcher( channel ); + this.messageDispatcher = ChannelAttributes.messageDispatcher( channel ); + this.serverAddress = ChannelAttributes.serverAddress( channel ); + this.serverVersion = ChannelAttributes.serverVersion( channel ); this.channelPool = channelPool; this.clock = clock; } @@ -70,6 +72,7 @@ public boolean isOpen() @Override public void enableAutoRead() { + assertOpen(); if ( autoReadEnabled.compareAndSet( false, true ) ) { setAutoRead( true ); @@ -79,6 +82,7 @@ public void enableAutoRead() @Override public void disableAutoRead() { + assertOpen(); if ( autoReadEnabled.compareAndSet( true, false ) ) { setAutoRead( false ); @@ -89,6 +93,7 @@ public void disableAutoRead() public void run( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ) { + assertOpen(); run( statement, parameters, runHandler, pullAllHandler, false ); } @@ -96,20 +101,12 @@ public void run( String statement, Map parameters, ResponseHandler public void runAndFlush( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ) { + assertOpen(); run( statement, parameters, runHandler, pullAllHandler, true ); } @Override - public void releaseInBackground() - { - if ( open.compareAndSet( true, false ) ) - { - reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock ) ); - } - } - - @Override - public CompletionStage releaseNow() + public CompletionStage release() { if ( open.compareAndSet( true, false ) ) { @@ -126,13 +123,13 @@ public CompletionStage releaseNow() @Override public BoltServerAddress serverAddress() { - return ChannelAttributes.serverAddress( channel ); + return serverAddress; } @Override public ServerVersion serverVersion() { - return ChannelAttributes.serverVersion( channel ); + return serverVersion; } private void run( String statement, Map parameters, ResponseHandler runHandler, @@ -185,4 +182,12 @@ private void setAutoRead( boolean value ) { channel.config().setAutoRead( value ); } + + private void assertOpen() + { + if ( !open.get() ) + { + throw new IllegalStateException( "Connection has been released to the pool and can't be reused" ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java index 1d08a336ca..0c164fd464 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java @@ -69,12 +69,6 @@ public void runAndFlush( String statement, Map parameters, Respons newRoutingResponseHandler( pullAllHandler ) ); } - @Override - public void releaseInBackground() - { - delegate.releaseInBackground(); - } - @Override public boolean isOpen() { @@ -82,9 +76,9 @@ public boolean isOpen() } @Override - public CompletionStage releaseNow() + public CompletionStage release() { - return delegate.releaseNow(); + return delegate.release(); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java index 2247fbf48e..12bfd06e9d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java @@ -39,12 +39,6 @@ public class ResetResponseHandler implements ResponseHandler private final Clock clock; private final Promise releasePromise; - public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher, - Clock clock ) - { - this( channel, pool, messageDispatcher, clock, null ); - } - public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher, Clock clock, Promise releasePromise ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java index 0710881618..1f8df5888b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java @@ -32,12 +32,17 @@ public SessionPullAllResponseHandler( Statement statement, RunResponseHandler ru @Override protected void afterSuccess() { - connection.releaseInBackground(); + releaseConnection(); } @Override protected void afterFailure( Throwable error ) { - connection.releaseInBackground(); + releaseConnection(); + } + + private void releaseConnection() + { + connection.release(); // release in background } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index 84d4b185f5..e3d2aeaab8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -39,9 +39,7 @@ void run( String statement, Map parameters, ResponseHandler runHan void runAndFlush( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ); - void releaseInBackground(); - - CompletionStage releaseNow(); + CompletionStage release(); BoltServerAddress serverAddress(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index 26b0deb160..e166e0f1e6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -58,7 +58,7 @@ public void shouldRollbackOnImplicitFailure() InOrder order = inOrder( connection ); order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() ); order.verify( connection ).runAndFlush( eq( "ROLLBACK" ), any(), any(), any() ); - order.verify( connection ).releaseInBackground(); + order.verify( connection ).release(); } @Test @@ -77,7 +77,7 @@ public void shouldRollbackOnExplicitFailure() InOrder order = inOrder( connection ); order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() ); order.verify( connection ).runAndFlush( eq( "ROLLBACK" ), any(), any(), any() ); - order.verify( connection ).releaseInBackground(); + order.verify( connection ).release(); } @Test @@ -95,7 +95,7 @@ public void shouldCommitOnSuccess() InOrder order = inOrder( connection ); order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() ); order.verify( connection ).runAndFlush( eq( "COMMIT" ), any(), any(), any() ); - order.verify( connection ).releaseInBackground(); + order.verify( connection ).release(); } @Test @@ -243,7 +243,7 @@ public void shouldReleaseConnectionWhenBeginFails() assertEquals( error, e ); } - verify( connection ).releaseNow(); + verify( connection ).release(); } @Test @@ -253,7 +253,7 @@ public void shouldNotReleaseConnectionWhenBeginSucceeds() ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) ); getBlocking( tx.beginAsync( Bookmark.from( "SomeBookmark" ) ) ); - verify( connection, never() ).releaseNow(); + verify( connection, never() ).release(); } private static ExplicitTransaction beginTx( Connection connection ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index a27801ff4d..8ab3738587 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -60,6 +60,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_MOCKS; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -88,7 +89,7 @@ public class NetworkSessionTest public void setUp() { connection = connectionMock(); - when( connection.releaseNow() ).thenReturn( completedFuture( null ) ); + when( connection.release() ).thenReturn( completedFuture( null ) ); when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); connectionProvider = mock( ConnectionProvider.class ); when( connectionProvider.acquireConnection( any( AccessMode.class ) ) ) @@ -214,7 +215,7 @@ public void releasesOpenConnectionUsedForRunWhenSessionIsClosed() InOrder inOrder = inOrder( connection ); inOrder.verify( connection ).runAndFlush( eq( "RETURN 1" ), any(), any(), any() ); - inOrder.verify( connection ).releaseNow(); + inOrder.verify( connection, atLeastOnce() ).release(); } @SuppressWarnings( "deprecation" ) @@ -274,7 +275,7 @@ public void releasesConnectionWhenTxIsClosed() verify( connection ).runAndFlush( eq( query ), any(), any(), any() ); tx.close(); - verify( connection ).releaseInBackground(); + verify( connection ).release(); } @Test @@ -484,22 +485,18 @@ public void writeTxRetriedUntilFailureWhenTxCloseThrows() } @Test - @SuppressWarnings( "deprecation" ) public void connectionShouldBeReleasedAfterSessionReset() { NetworkSession session = newSession( connectionProvider, READ ); session.run( "RETURN 1" ); - verify( connection, never() ).releaseInBackground(); - verify( connection, never() ).releaseNow(); + verify( connection, never() ).release(); session.reset(); - verify( connection, never() ).releaseInBackground(); - verify( connection ).releaseNow(); + verify( connection ).release(); } @Test - @SuppressWarnings( "deprecation" ) public void transactionShouldBeRolledBackAfterSessionReset() { NetworkSession session = newSession( connectionProvider, READ ); @@ -670,7 +667,7 @@ public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset() Transaction tx = session.beginTransaction(); assertTrue( tx.isOpen() ); - when( connection.releaseNow() ).then( invocation -> + when( connection.release() ).then( invocation -> { // verify that tx is not open when connection is released assertFalse( tx.isOpen() ); @@ -679,7 +676,7 @@ public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset() session.reset(); - verify( connection ).releaseNow(); + verify( connection ).release(); } private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java index fe4d526431..94401b2a4c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java @@ -37,6 +37,7 @@ import org.neo4j.driver.internal.handlers.NoOpResponseHandler; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.util.FakeClock; +import org.neo4j.driver.internal.util.ServerVersion; import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.startsWith; @@ -44,6 +45,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -77,7 +79,7 @@ public void shouldBeOpenAfterCreated() public void shouldNotBeOpenAfterRelease() { NettyConnection connection = newConnection( new EmbeddedChannel() ); - connection.releaseNow(); + connection.release(); assertFalse( connection.isOpen() ); } @@ -90,7 +92,7 @@ public void shouldSendResetOnRelease() NettyConnection connection = newConnection( channel ); - connection.releaseNow(); + connection.release(); channel.runPendingTasks(); assertEquals( 1, channel.outboundMessages().size() ); @@ -112,15 +114,116 @@ public void shouldWriteRunAndFlushInEventLoopThread() throws Exception } @Test - public void shouldWriteReleaseInEventLoopThread() throws Exception + public void shouldWriteForceReleaseInEventLoopThread() throws Exception { - testWriteInEventLoop( "ReleaseTestEventLoop", NettyConnection::releaseInBackground ); + testWriteInEventLoop( "ReleaseTestEventLoop", NettyConnection::release ); } @Test - public void shouldWriteForceReleaseInEventLoopThread() throws Exception + public void shouldNotEnableAutoReadWhenReleased() + { + EmbeddedChannel channel = new EmbeddedChannel(); + channel.config().setAutoRead( false ); + + NettyConnection connection = newConnection( channel ); + + connection.release(); + + try + { + connection.enableAutoRead(); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertConnectionReleasedError( e ); + } + assertFalse( channel.config().isAutoRead() ); + } + + @Test + public void shouldNotDisableAutoReadWhenReleased() + { + EmbeddedChannel channel = new EmbeddedChannel(); + channel.config().setAutoRead( true ); + + NettyConnection connection = newConnection( channel ); + + connection.release(); + + try + { + connection.disableAutoRead(); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertConnectionReleasedError( e ); + } + assertTrue( channel.config().isAutoRead() ); + } + + @Test + public void shouldNotRunWhenReleased() + { + NettyConnection connection = newConnection( new EmbeddedChannel() ); + + connection.release(); + + try + { + connection.run( "RETURN 1", emptyMap(), mock( ResponseHandler.class ), mock( ResponseHandler.class ) ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertConnectionReleasedError( e ); + } + } + + @Test + public void shouldNotRunAndFlushWhenReleased() + { + NettyConnection connection = newConnection( new EmbeddedChannel() ); + + connection.release(); + + try + { + connection.runAndFlush( "RETURN 1", emptyMap(), mock( ResponseHandler.class ), + mock( ResponseHandler.class ) ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertConnectionReleasedError( e ); + } + } + + @Test + public void shouldReturnServerAddressWhenReleased() + { + EmbeddedChannel channel = new EmbeddedChannel(); + BoltServerAddress address = new BoltServerAddress( "host", 4242 ); + ChannelAttributes.setServerAddress( channel, address ); + + NettyConnection connection = newConnection( channel ); + connection.release(); + + assertEquals( address, connection.serverAddress() ); + } + + @Test + public void shouldReturnServerVersionWhenReleased() { - testWriteInEventLoop( "ReleaseNowTestEventLoop", NettyConnection::releaseNow ); + EmbeddedChannel channel = new EmbeddedChannel(); + ServerVersion version = ServerVersion.v3_2_0; + ChannelAttributes.setServerVersion( channel, version ); + + NettyConnection connection = newConnection( channel ); + connection.release(); + + assertEquals( version, connection.serverVersion() ); } private void testWriteInEventLoop( String threadName, Consumer action ) throws Exception @@ -162,10 +265,15 @@ private static NettyConnection newConnection( EmbeddedChannel channel ) return new NettyConnection( channel, mock( ChannelPool.class ), new FakeClock() ); } + private static void assertConnectionReleasedError( IllegalStateException e ) + { + assertThat( e.getMessage(), startsWith( "Connection has been released" ) ); + } + private static class ThreadTrackingInboundMessageDispatcher extends InboundMessageDispatcher { - final Set queueThreadNames = new ConcurrentSet<>(); + final Set queueThreadNames = new ConcurrentSet<>(); ThreadTrackingInboundMessageDispatcher( Channel channel ) { super( channel, DEV_NULL_LOGGING ); @@ -177,5 +285,6 @@ public void queue( ResponseHandler handler ) queueThreadNames.add( Thread.currentThread().getName() ); super.queue( handler ); } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java index 1954fbb634..6ebe54b941 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java @@ -77,7 +77,7 @@ public void shouldAcquireConnectionWhenPoolIsEmpty() throws Exception public void shouldAcquireIdleConnection() throws Exception { Connection connection1 = await( pool.acquire( neo4j.address() ) ); - await( connection1.releaseNow() ); + await( connection1.release() ); Connection connection2 = await( pool.acquire( neo4j.address() ) ); assertNotNull( connection2 ); @@ -102,7 +102,7 @@ public void shouldFailToAcquireConnectionToWrongAddress() throws Exception public void shouldFailToAcquireWhenPoolClosed() throws Exception { Connection connection = await( pool.acquire( neo4j.address() ) ); - await( connection.releaseNow() ); + await( connection.release() ); await( pool.close() ); try diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java index d12849e141..d97818f219 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java @@ -25,10 +25,13 @@ import org.junit.Test; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.FakeClock; import static java.util.Collections.emptyMap; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.neo4j.driver.internal.async.ChannelAttributes.lastUsedTimestamp; @@ -50,12 +53,12 @@ public void shouldReleaseChannelOnSuccess() ChannelPool pool = mock( ChannelPool.class ); FakeClock clock = new FakeClock(); clock.progress( 5 ); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock ); + ResetResponseHandler handler = newHandler( pool, clock ); handler.onSuccess( emptyMap() ); verifyLastUsedTimestamp( 5 ); - verify( pool ).release( channel ); + verify( pool ).release( eq( channel ), any() ); } @Test @@ -65,7 +68,7 @@ public void shouldReleaseChannelWithPromiseOnSuccess() FakeClock clock = new FakeClock(); clock.progress( 42 ); Promise promise = channel.newPromise(); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock, promise ); + ResetResponseHandler handler = newHandler( pool, clock, promise ); handler.onSuccess( emptyMap() ); @@ -79,12 +82,12 @@ public void shouldReleaseChannelOnFailure() ChannelPool pool = mock( ChannelPool.class ); FakeClock clock = new FakeClock(); clock.progress( 100 ); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock ); + ResetResponseHandler handler = newHandler( pool, clock ); handler.onFailure( new RuntimeException() ); verifyLastUsedTimestamp( 100 ); - verify( pool ).release( channel ); + verify( pool ).release( eq( channel ), any() ); } @Test @@ -94,7 +97,7 @@ public void shouldReleaseChannelWithPromiseOnFailure() FakeClock clock = new FakeClock(); clock.progress( 99 ); Promise promise = channel.newPromise(); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock, promise ); + ResetResponseHandler handler = newHandler( pool, clock, promise ); handler.onFailure( new RuntimeException() ); @@ -106,7 +109,7 @@ public void shouldReleaseChannelWithPromiseOnFailure() public void shouldUnMuteAckFailureOnSuccess() { ChannelPool pool = mock( ChannelPool.class ); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, new FakeClock() ); + ResetResponseHandler handler = newHandler( pool, new FakeClock() ); handler.onSuccess( emptyMap() ); @@ -117,7 +120,7 @@ public void shouldUnMuteAckFailureOnSuccess() public void shouldUnMuteAckFailureOnFailure() { ChannelPool pool = mock( ChannelPool.class ); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, new FakeClock() ); + ResetResponseHandler handler = newHandler( pool, new FakeClock() ); handler.onFailure( new RuntimeException() ); @@ -128,4 +131,14 @@ private void verifyLastUsedTimestamp( int expectedValue ) { assertEquals( expectedValue, lastUsedTimestamp( channel ).intValue() ); } + + private ResetResponseHandler newHandler( ChannelPool pool, Clock clock ) + { + return newHandler( pool, clock, channel.newPromise() ); + } + + private ResetResponseHandler newHandler( ChannelPool pool, Clock clock, Promise promise ) + { + return new ResetResponseHandler( channel, pool, messageDispatcher, clock, promise ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java index a57f3984ff..793003bca4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java @@ -41,7 +41,7 @@ public void shouldReleaseConnectionOnSuccess() handler.onSuccess( emptyMap() ); - verify( connection ).releaseInBackground(); + verify( connection ).release(); } @Test @@ -52,7 +52,7 @@ public void shouldReleaseConnectionOnFailure() handler.onFailure( new RuntimeException() ); - verify( connection ).releaseInBackground(); + verify( connection ).release(); } private SessionPullAllResponseHandler newHandler( Connection connection ) diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java index 4dd885888b..3261bd9dc9 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java @@ -61,6 +61,7 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -99,13 +100,13 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultConsumed() StatementResult result = createNodesInNewSession( 12 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); result.consume(); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -114,14 +115,14 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultSummaryObtaine StatementResult result = createNodesInNewSession( 5 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); ResultSummary summary = result.summary(); assertEquals( 5, summary.counters().nodesCreated() ); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -130,14 +131,14 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedInList( StatementResult result = createNodesInNewSession( 2 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); List records = result.list(); assertEquals( 2, records.size() ); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -146,13 +147,13 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenSingleRecordFetched( StatementResult result = createNodesInNewSession( 1 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); assertNotNull( result.single() ); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -161,7 +162,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedAsItera StatementResult result = createNodesInNewSession( 6 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); int seenRecords = 0; while ( result.hasNext() ) @@ -173,7 +174,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedAsItera Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -184,7 +185,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenServerErrorDuringRes StatementResult result = session.run( "UNWIND range(10, 0, -1) AS i CREATE (n {index: 10/i}) RETURN n" ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); try { @@ -198,7 +199,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenServerErrorDuringRes Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -209,7 +210,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionCommitte Transaction tx = session.beginTransaction(); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); StatementResult result = createNodes( 5, tx ); tx.success(); @@ -217,7 +218,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionCommitte Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); assertEquals( 5, result.list().size() ); } @@ -230,7 +231,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionRolledBa Transaction tx = session.beginTransaction(); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); StatementResult result = createNodes( 8, tx ); tx.failure(); @@ -238,7 +239,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionRolledBa Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); assertEquals( 8, result.list().size() ); } @@ -252,12 +253,12 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToC } Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1 ).releaseInBackground(); // connection used for constraint creation + verify( connection1, atLeastOnce() ).release(); // connection used for constraint creation Session session = driver.session(); Transaction tx = session.beginTransaction(); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; - verify( connection2, never() ).releaseInBackground(); + verify( connection2, never() ).release(); // property existence constraints are verified on commit, try to violate it tx.run( "CREATE (:Book)" ); @@ -274,7 +275,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToC } // connection should have been released after failed node creation - verify( connection2 ).releaseInBackground(); + verify( connection2 ).release(); } private StatementResult createNodesInNewSession( int nodesToCreate )