diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java index bd2de5282e..50eceaa612 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java @@ -52,7 +52,7 @@ public CompletionStage acquireConnection( AccessMode mode ) @Override public CompletionStage verifyConnectivity() { - return acquireConnection( READ ).thenCompose( Connection::releaseNow ); + return acquireConnection( READ ).thenCompose( Connection::release ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 5bb8641208..a9d2548c17 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -122,7 +122,7 @@ public CompletionStage beginAsync( Bookmark initialBookmark if ( beginError != null ) { // release connection if begin failed, transaction can't be started - connection.releaseNow(); + connection.release(); throw new CompletionException( Futures.completionErrorCause( beginError ) ); } return tx; @@ -397,7 +397,7 @@ private BiConsumer transactionClosed( State newState ) return ( ignore, error ) -> { state = newState; - connection.releaseInBackground(); + connection.release(); // release in background session.setBookmark( bookmark ); }; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index b40b1dd785..f40fa53819 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -254,14 +254,13 @@ public void reset() private CompletionStage resetAsync() { - return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() ) - .thenAccept( tx -> - { - if ( tx != null ) - { - tx.markTerminated(); - } - } ); + return existingTransactionOrNull().thenAccept( tx -> + { + if ( tx != null ) + { + tx.markTerminated(); + } + } ).thenCompose( ignore -> releaseConnection() ); } @Override @@ -496,7 +495,7 @@ private CompletionStage acquireConnection( AccessMode mode ) private CompletionStage releaseResources() { - return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() ); + return rollbackTransaction().thenCompose( ignore -> releaseConnection() ); } private CompletionStage rollbackTransaction() @@ -516,13 +515,13 @@ private CompletionStage rollbackTransaction() } ); } - private CompletionStage releaseConnectionNow() + private CompletionStage releaseConnection() { return existingConnectionOrNull().thenCompose( connection -> { if ( connection != null ) { - return connection.releaseNow(); + return connection.release(); } return completedFuture( null ); } ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java index 1447897bca..9532de17e6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java @@ -39,14 +39,14 @@ import org.neo4j.driver.v1.Value; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher; import static org.neo4j.driver.internal.util.Futures.asCompletionStage; -// todo: keep state flags to prohibit interaction with released connections public class NettyConnection implements Connection { private final Channel channel; private final InboundMessageDispatcher messageDispatcher; + private final BoltServerAddress serverAddress; + private final ServerVersion serverVersion; private final ChannelPool channelPool; private final Clock clock; @@ -56,7 +56,9 @@ public class NettyConnection implements Connection public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock ) { this.channel = channel; - this.messageDispatcher = messageDispatcher( channel ); + this.messageDispatcher = ChannelAttributes.messageDispatcher( channel ); + this.serverAddress = ChannelAttributes.serverAddress( channel ); + this.serverVersion = ChannelAttributes.serverVersion( channel ); this.channelPool = channelPool; this.clock = clock; } @@ -70,6 +72,7 @@ public boolean isOpen() @Override public void enableAutoRead() { + assertOpen(); if ( autoReadEnabled.compareAndSet( false, true ) ) { setAutoRead( true ); @@ -79,6 +82,7 @@ public void enableAutoRead() @Override public void disableAutoRead() { + assertOpen(); if ( autoReadEnabled.compareAndSet( true, false ) ) { setAutoRead( false ); @@ -89,6 +93,7 @@ public void disableAutoRead() public void run( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ) { + assertOpen(); run( statement, parameters, runHandler, pullAllHandler, false ); } @@ -96,20 +101,12 @@ public void run( String statement, Map parameters, ResponseHandler public void runAndFlush( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ) { + assertOpen(); run( statement, parameters, runHandler, pullAllHandler, true ); } @Override - public void releaseInBackground() - { - if ( open.compareAndSet( true, false ) ) - { - reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock ) ); - } - } - - @Override - public CompletionStage releaseNow() + public CompletionStage release() { if ( open.compareAndSet( true, false ) ) { @@ -126,13 +123,13 @@ public CompletionStage releaseNow() @Override public BoltServerAddress serverAddress() { - return ChannelAttributes.serverAddress( channel ); + return serverAddress; } @Override public ServerVersion serverVersion() { - return ChannelAttributes.serverVersion( channel ); + return serverVersion; } private void run( String statement, Map parameters, ResponseHandler runHandler, @@ -185,4 +182,12 @@ private void setAutoRead( boolean value ) { channel.config().setAutoRead( value ); } + + private void assertOpen() + { + if ( !open.get() ) + { + throw new IllegalStateException( "Connection has been released to the pool and can't be reused" ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java index 1d08a336ca..0c164fd464 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java @@ -69,12 +69,6 @@ public void runAndFlush( String statement, Map parameters, Respons newRoutingResponseHandler( pullAllHandler ) ); } - @Override - public void releaseInBackground() - { - delegate.releaseInBackground(); - } - @Override public boolean isOpen() { @@ -82,9 +76,9 @@ public boolean isOpen() } @Override - public CompletionStage releaseNow() + public CompletionStage release() { - return delegate.releaseNow(); + return delegate.release(); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java index 2247fbf48e..12bfd06e9d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java @@ -39,12 +39,6 @@ public class ResetResponseHandler implements ResponseHandler private final Clock clock; private final Promise releasePromise; - public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher, - Clock clock ) - { - this( channel, pool, messageDispatcher, clock, null ); - } - public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher, Clock clock, Promise releasePromise ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java index 0710881618..1f8df5888b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java @@ -32,12 +32,17 @@ public SessionPullAllResponseHandler( Statement statement, RunResponseHandler ru @Override protected void afterSuccess() { - connection.releaseInBackground(); + releaseConnection(); } @Override protected void afterFailure( Throwable error ) { - connection.releaseInBackground(); + releaseConnection(); + } + + private void releaseConnection() + { + connection.release(); // release in background } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index 84d4b185f5..e3d2aeaab8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -39,9 +39,7 @@ void run( String statement, Map parameters, ResponseHandler runHan void runAndFlush( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ); - void releaseInBackground(); - - CompletionStage releaseNow(); + CompletionStage release(); BoltServerAddress serverAddress(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index 26b0deb160..e166e0f1e6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -58,7 +58,7 @@ public void shouldRollbackOnImplicitFailure() InOrder order = inOrder( connection ); order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() ); order.verify( connection ).runAndFlush( eq( "ROLLBACK" ), any(), any(), any() ); - order.verify( connection ).releaseInBackground(); + order.verify( connection ).release(); } @Test @@ -77,7 +77,7 @@ public void shouldRollbackOnExplicitFailure() InOrder order = inOrder( connection ); order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() ); order.verify( connection ).runAndFlush( eq( "ROLLBACK" ), any(), any(), any() ); - order.verify( connection ).releaseInBackground(); + order.verify( connection ).release(); } @Test @@ -95,7 +95,7 @@ public void shouldCommitOnSuccess() InOrder order = inOrder( connection ); order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() ); order.verify( connection ).runAndFlush( eq( "COMMIT" ), any(), any(), any() ); - order.verify( connection ).releaseInBackground(); + order.verify( connection ).release(); } @Test @@ -243,7 +243,7 @@ public void shouldReleaseConnectionWhenBeginFails() assertEquals( error, e ); } - verify( connection ).releaseNow(); + verify( connection ).release(); } @Test @@ -253,7 +253,7 @@ public void shouldNotReleaseConnectionWhenBeginSucceeds() ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) ); getBlocking( tx.beginAsync( Bookmark.from( "SomeBookmark" ) ) ); - verify( connection, never() ).releaseNow(); + verify( connection, never() ).release(); } private static ExplicitTransaction beginTx( Connection connection ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 5ea0f25731..8ab3738587 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -60,6 +60,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_MOCKS; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -88,7 +89,7 @@ public class NetworkSessionTest public void setUp() { connection = connectionMock(); - when( connection.releaseNow() ).thenReturn( completedFuture( null ) ); + when( connection.release() ).thenReturn( completedFuture( null ) ); when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); connectionProvider = mock( ConnectionProvider.class ); when( connectionProvider.acquireConnection( any( AccessMode.class ) ) ) @@ -214,7 +215,7 @@ public void releasesOpenConnectionUsedForRunWhenSessionIsClosed() InOrder inOrder = inOrder( connection ); inOrder.verify( connection ).runAndFlush( eq( "RETURN 1" ), any(), any(), any() ); - inOrder.verify( connection ).releaseNow(); + inOrder.verify( connection, atLeastOnce() ).release(); } @SuppressWarnings( "deprecation" ) @@ -274,7 +275,7 @@ public void releasesConnectionWhenTxIsClosed() verify( connection ).runAndFlush( eq( query ), any(), any(), any() ); tx.close(); - verify( connection ).releaseInBackground(); + verify( connection ).release(); } @Test @@ -484,22 +485,18 @@ public void writeTxRetriedUntilFailureWhenTxCloseThrows() } @Test - @SuppressWarnings( "deprecation" ) public void connectionShouldBeReleasedAfterSessionReset() { NetworkSession session = newSession( connectionProvider, READ ); session.run( "RETURN 1" ); - verify( connection, never() ).releaseInBackground(); - verify( connection, never() ).releaseNow(); + verify( connection, never() ).release(); session.reset(); - verify( connection, never() ).releaseInBackground(); - verify( connection ).releaseNow(); + verify( connection ).release(); } @Test - @SuppressWarnings( "deprecation" ) public void transactionShouldBeRolledBackAfterSessionReset() { NetworkSession session = newSession( connectionProvider, READ ); @@ -663,6 +660,25 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() verifyBeginTx( connection, times( 1 ) ); } + @Test + public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset() + { + NetworkSession session = newSession( connectionProvider, READ ); + Transaction tx = session.beginTransaction(); + + assertTrue( tx.isOpen() ); + when( connection.release() ).then( invocation -> + { + // verify that tx is not open when connection is released + assertFalse( tx.isOpen() ); + return completedFuture( null ); + } ); + + session.reset(); + + verify( connection ).release(); + } + private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode ) { NetworkSession session = newSession( connectionProvider, sessionMode ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java index fe4d526431..94401b2a4c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java @@ -37,6 +37,7 @@ import org.neo4j.driver.internal.handlers.NoOpResponseHandler; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.util.FakeClock; +import org.neo4j.driver.internal.util.ServerVersion; import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.startsWith; @@ -44,6 +45,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -77,7 +79,7 @@ public void shouldBeOpenAfterCreated() public void shouldNotBeOpenAfterRelease() { NettyConnection connection = newConnection( new EmbeddedChannel() ); - connection.releaseNow(); + connection.release(); assertFalse( connection.isOpen() ); } @@ -90,7 +92,7 @@ public void shouldSendResetOnRelease() NettyConnection connection = newConnection( channel ); - connection.releaseNow(); + connection.release(); channel.runPendingTasks(); assertEquals( 1, channel.outboundMessages().size() ); @@ -112,15 +114,116 @@ public void shouldWriteRunAndFlushInEventLoopThread() throws Exception } @Test - public void shouldWriteReleaseInEventLoopThread() throws Exception + public void shouldWriteForceReleaseInEventLoopThread() throws Exception { - testWriteInEventLoop( "ReleaseTestEventLoop", NettyConnection::releaseInBackground ); + testWriteInEventLoop( "ReleaseTestEventLoop", NettyConnection::release ); } @Test - public void shouldWriteForceReleaseInEventLoopThread() throws Exception + public void shouldNotEnableAutoReadWhenReleased() + { + EmbeddedChannel channel = new EmbeddedChannel(); + channel.config().setAutoRead( false ); + + NettyConnection connection = newConnection( channel ); + + connection.release(); + + try + { + connection.enableAutoRead(); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertConnectionReleasedError( e ); + } + assertFalse( channel.config().isAutoRead() ); + } + + @Test + public void shouldNotDisableAutoReadWhenReleased() + { + EmbeddedChannel channel = new EmbeddedChannel(); + channel.config().setAutoRead( true ); + + NettyConnection connection = newConnection( channel ); + + connection.release(); + + try + { + connection.disableAutoRead(); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertConnectionReleasedError( e ); + } + assertTrue( channel.config().isAutoRead() ); + } + + @Test + public void shouldNotRunWhenReleased() + { + NettyConnection connection = newConnection( new EmbeddedChannel() ); + + connection.release(); + + try + { + connection.run( "RETURN 1", emptyMap(), mock( ResponseHandler.class ), mock( ResponseHandler.class ) ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertConnectionReleasedError( e ); + } + } + + @Test + public void shouldNotRunAndFlushWhenReleased() + { + NettyConnection connection = newConnection( new EmbeddedChannel() ); + + connection.release(); + + try + { + connection.runAndFlush( "RETURN 1", emptyMap(), mock( ResponseHandler.class ), + mock( ResponseHandler.class ) ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertConnectionReleasedError( e ); + } + } + + @Test + public void shouldReturnServerAddressWhenReleased() + { + EmbeddedChannel channel = new EmbeddedChannel(); + BoltServerAddress address = new BoltServerAddress( "host", 4242 ); + ChannelAttributes.setServerAddress( channel, address ); + + NettyConnection connection = newConnection( channel ); + connection.release(); + + assertEquals( address, connection.serverAddress() ); + } + + @Test + public void shouldReturnServerVersionWhenReleased() { - testWriteInEventLoop( "ReleaseNowTestEventLoop", NettyConnection::releaseNow ); + EmbeddedChannel channel = new EmbeddedChannel(); + ServerVersion version = ServerVersion.v3_2_0; + ChannelAttributes.setServerVersion( channel, version ); + + NettyConnection connection = newConnection( channel ); + connection.release(); + + assertEquals( version, connection.serverVersion() ); } private void testWriteInEventLoop( String threadName, Consumer action ) throws Exception @@ -162,10 +265,15 @@ private static NettyConnection newConnection( EmbeddedChannel channel ) return new NettyConnection( channel, mock( ChannelPool.class ), new FakeClock() ); } + private static void assertConnectionReleasedError( IllegalStateException e ) + { + assertThat( e.getMessage(), startsWith( "Connection has been released" ) ); + } + private static class ThreadTrackingInboundMessageDispatcher extends InboundMessageDispatcher { - final Set queueThreadNames = new ConcurrentSet<>(); + final Set queueThreadNames = new ConcurrentSet<>(); ThreadTrackingInboundMessageDispatcher( Channel channel ) { super( channel, DEV_NULL_LOGGING ); @@ -177,5 +285,6 @@ public void queue( ResponseHandler handler ) queueThreadNames.add( Thread.currentThread().getName() ); super.queue( handler ); } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java index 1954fbb634..6ebe54b941 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java @@ -77,7 +77,7 @@ public void shouldAcquireConnectionWhenPoolIsEmpty() throws Exception public void shouldAcquireIdleConnection() throws Exception { Connection connection1 = await( pool.acquire( neo4j.address() ) ); - await( connection1.releaseNow() ); + await( connection1.release() ); Connection connection2 = await( pool.acquire( neo4j.address() ) ); assertNotNull( connection2 ); @@ -102,7 +102,7 @@ public void shouldFailToAcquireConnectionToWrongAddress() throws Exception public void shouldFailToAcquireWhenPoolClosed() throws Exception { Connection connection = await( pool.acquire( neo4j.address() ) ); - await( connection.releaseNow() ); + await( connection.release() ); await( pool.close() ); try diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java index d12849e141..d97818f219 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java @@ -25,10 +25,13 @@ import org.junit.Test; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.FakeClock; import static java.util.Collections.emptyMap; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.neo4j.driver.internal.async.ChannelAttributes.lastUsedTimestamp; @@ -50,12 +53,12 @@ public void shouldReleaseChannelOnSuccess() ChannelPool pool = mock( ChannelPool.class ); FakeClock clock = new FakeClock(); clock.progress( 5 ); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock ); + ResetResponseHandler handler = newHandler( pool, clock ); handler.onSuccess( emptyMap() ); verifyLastUsedTimestamp( 5 ); - verify( pool ).release( channel ); + verify( pool ).release( eq( channel ), any() ); } @Test @@ -65,7 +68,7 @@ public void shouldReleaseChannelWithPromiseOnSuccess() FakeClock clock = new FakeClock(); clock.progress( 42 ); Promise promise = channel.newPromise(); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock, promise ); + ResetResponseHandler handler = newHandler( pool, clock, promise ); handler.onSuccess( emptyMap() ); @@ -79,12 +82,12 @@ public void shouldReleaseChannelOnFailure() ChannelPool pool = mock( ChannelPool.class ); FakeClock clock = new FakeClock(); clock.progress( 100 ); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock ); + ResetResponseHandler handler = newHandler( pool, clock ); handler.onFailure( new RuntimeException() ); verifyLastUsedTimestamp( 100 ); - verify( pool ).release( channel ); + verify( pool ).release( eq( channel ), any() ); } @Test @@ -94,7 +97,7 @@ public void shouldReleaseChannelWithPromiseOnFailure() FakeClock clock = new FakeClock(); clock.progress( 99 ); Promise promise = channel.newPromise(); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, clock, promise ); + ResetResponseHandler handler = newHandler( pool, clock, promise ); handler.onFailure( new RuntimeException() ); @@ -106,7 +109,7 @@ public void shouldReleaseChannelWithPromiseOnFailure() public void shouldUnMuteAckFailureOnSuccess() { ChannelPool pool = mock( ChannelPool.class ); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, new FakeClock() ); + ResetResponseHandler handler = newHandler( pool, new FakeClock() ); handler.onSuccess( emptyMap() ); @@ -117,7 +120,7 @@ public void shouldUnMuteAckFailureOnSuccess() public void shouldUnMuteAckFailureOnFailure() { ChannelPool pool = mock( ChannelPool.class ); - ResetResponseHandler handler = new ResetResponseHandler( channel, pool, messageDispatcher, new FakeClock() ); + ResetResponseHandler handler = newHandler( pool, new FakeClock() ); handler.onFailure( new RuntimeException() ); @@ -128,4 +131,14 @@ private void verifyLastUsedTimestamp( int expectedValue ) { assertEquals( expectedValue, lastUsedTimestamp( channel ).intValue() ); } + + private ResetResponseHandler newHandler( ChannelPool pool, Clock clock ) + { + return newHandler( pool, clock, channel.newPromise() ); + } + + private ResetResponseHandler newHandler( ChannelPool pool, Clock clock, Promise promise ) + { + return new ResetResponseHandler( channel, pool, messageDispatcher, clock, promise ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java index a57f3984ff..793003bca4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java @@ -41,7 +41,7 @@ public void shouldReleaseConnectionOnSuccess() handler.onSuccess( emptyMap() ); - verify( connection ).releaseInBackground(); + verify( connection ).release(); } @Test @@ -52,7 +52,7 @@ public void shouldReleaseConnectionOnFailure() handler.onFailure( new RuntimeException() ); - verify( connection ).releaseInBackground(); + verify( connection ).release(); } private SessionPullAllResponseHandler newHandler( Connection connection ) diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java index 4dd885888b..3261bd9dc9 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java @@ -61,6 +61,7 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -99,13 +100,13 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultConsumed() StatementResult result = createNodesInNewSession( 12 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); result.consume(); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -114,14 +115,14 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultSummaryObtaine StatementResult result = createNodesInNewSession( 5 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); ResultSummary summary = result.summary(); assertEquals( 5, summary.counters().nodesCreated() ); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -130,14 +131,14 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedInList( StatementResult result = createNodesInNewSession( 2 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); List records = result.list(); assertEquals( 2, records.size() ); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -146,13 +147,13 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenSingleRecordFetched( StatementResult result = createNodesInNewSession( 1 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); assertNotNull( result.single() ); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -161,7 +162,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedAsItera StatementResult result = createNodesInNewSession( 6 ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); int seenRecords = 0; while ( result.hasNext() ) @@ -173,7 +174,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedAsItera Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -184,7 +185,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenServerErrorDuringRes StatementResult result = session.run( "UNWIND range(10, 0, -1) AS i CREATE (n {index: 10/i}) RETURN n" ); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); try { @@ -198,7 +199,7 @@ public void connectionUsedForSessionRunReturnedToThePoolWhenServerErrorDuringRes Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); } @Test @@ -209,7 +210,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionCommitte Transaction tx = session.beginTransaction(); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); StatementResult result = createNodes( 5, tx ); tx.success(); @@ -217,7 +218,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionCommitte Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); assertEquals( 5, result.list().size() ); } @@ -230,7 +231,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionRolledBa Transaction tx = session.beginTransaction(); Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1, never() ).releaseInBackground(); + verify( connection1, never() ).release(); StatementResult result = createNodes( 8, tx ); tx.failure(); @@ -238,7 +239,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionRolledBa Connection connection2 = connectionPool.lastAcquiredConnectionSpy; assertSame( connection1, connection2 ); - verify( connection1 ).releaseInBackground(); + verify( connection1 ).release(); assertEquals( 8, result.list().size() ); } @@ -252,12 +253,12 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToC } Connection connection1 = connectionPool.lastAcquiredConnectionSpy; - verify( connection1 ).releaseInBackground(); // connection used for constraint creation + verify( connection1, atLeastOnce() ).release(); // connection used for constraint creation Session session = driver.session(); Transaction tx = session.beginTransaction(); Connection connection2 = connectionPool.lastAcquiredConnectionSpy; - verify( connection2, never() ).releaseInBackground(); + verify( connection2, never() ).release(); // property existence constraints are verified on commit, try to violate it tx.run( "CREATE (:Book)" ); @@ -274,7 +275,7 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToC } // connection should have been released after failed node creation - verify( connection2 ).releaseInBackground(); + verify( connection2 ).release(); } private StatementResult createNodesInNewSession( int nodesToCreate )