diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java index a62f757aa5..10d335a51e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java @@ -105,9 +105,15 @@ public PooledConnection acquire( Supplier supplier ) return connection; } - public List toList() + public int activeConnections() { - return new ArrayList<>( queue ); + return acquiredConnections.size(); + } + + void disposeBroken( PooledConnection connection ) + { + acquiredConnections.remove( connection ); + disposeSafely( connection ); } public boolean isEmpty() diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java index 84430fe27d..a7431d94e3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java @@ -47,7 +47,7 @@ public void accept( PooledConnection pooledConnection ) } else { - pooledConnection.dispose(); + connections.disposeBroken( pooledConnection ); } } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index 463525ecaa..06184dcae6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -109,6 +109,12 @@ public void close() } } + public int activeConnections( BoltServerAddress address ) + { + BlockingPooledConnectionQueue connectionQueue = pools.get( address ); + return connectionQueue == null ? 0 : connectionQueue.activeConnections(); + } + private BlockingPooledConnectionQueue pool( BoltServerAddress address ) { BlockingPooledConnectionQueue pool = pools.get( address ); @@ -130,10 +136,16 @@ private PooledConnection acquireConnection( BoltServerAddress address, { ConnectionSupplier connectionSupplier = new ConnectionSupplier( connectionQueue, address ); - PooledConnection connection; + PooledConnection connection = null; boolean connectionCreated; do { + // dispose previous connection that can't be acquired + if ( connection != null ) + { + connectionQueue.disposeBroken( connection ); + } + connection = connectionQueue.acquire( connectionSupplier ); connectionCreated = connectionSupplier.connectionCreated(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java index 67d6fad4bd..98aaf7539f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java @@ -239,6 +239,67 @@ public void shouldTerminateBothAcquiredAndIdleConnections() verify( connection4 ).dispose(); } + @Test + public void shouldReportZeroActiveConnectionsWhenEmpty() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 5 ); + + assertEquals( 0, queue.activeConnections() ); + } + + @Test + public void shouldReportZeroActiveConnectionsWhenHasOnlyIdleConnections() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 5 ); + + queue.offer( mock( PooledConnection.class ) ); + queue.offer( mock( PooledConnection.class ) ); + + assertEquals( 0, queue.activeConnections() ); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void shouldReportActiveConnections() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 5 ); + + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + PooledConnection connection3 = mock( PooledConnection.class ); + + queue.offer( connection1 ); + queue.offer( connection2 ); + queue.offer( connection3 ); + + queue.acquire( mock( Supplier.class ) ); + queue.acquire( mock( Supplier.class ) ); + queue.acquire( mock( Supplier.class ) ); + + assertEquals( 3, queue.activeConnections() ); + + queue.offer( connection1 ); + queue.offer( connection2 ); + queue.offer( connection3 ); + + assertEquals( 0, queue.activeConnections() ); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void shouldDisposeBrokenConnections() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 5 ); + + queue.offer( mock( PooledConnection.class ) ); + PooledConnection connection = queue.acquire( mock( Supplier.class ) ); + assertEquals( 1, queue.activeConnections() ); + + queue.disposeBroken( connection ); + assertEquals( 0, queue.activeConnections() ); + verify( connection ).dispose(); + } + private static BlockingPooledConnectionQueue newConnectionQueue( int capacity ) { return newConnectionQueue( capacity, mock( Logging.class, RETURNS_MOCKS ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumerTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumerTest.java new file mode 100644 index 0000000000..305876daa4 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumerTest.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.net.pooling; + +import org.junit.Test; + +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.PooledConnection; +import org.neo4j.driver.internal.util.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; + +public class PooledConnectionReleaseConsumerTest +{ + @Test + public void shouldOfferReusableConnectionsBackToTheConnectionsQueue() + { + BlockingPooledConnectionQueue queue = newConnectionQueue(); + PooledConnection connection = acquireConnection( queue ); + + PooledConnectionValidator validator = newConnectionValidator( true ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator ); + + releaseConsumer.accept( connection ); + + // connection should now be idle + assertEquals( 0, queue.activeConnections() ); + assertEquals( 1, queue.size() ); + + verify( connection ).reset(); + verify( connection ).sync(); + } + + @Test + public void shouldAskConnectionsQueueToDisposeNotReusableConnections() + { + BlockingPooledConnectionQueue queue = newConnectionQueue(); + PooledConnection connection = acquireConnection( queue ); + + PooledConnectionValidator validator = newConnectionValidator( false ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator ); + + releaseConsumer.accept( connection ); + + // connection should've been disposed + assertEquals( 0, queue.activeConnections() ); + assertEquals( 0, queue.size() ); + + verify( connection ).dispose(); + } + + private static BlockingPooledConnectionQueue newConnectionQueue() + { + return new BlockingPooledConnectionQueue( LOCAL_DEFAULT, 5, DEV_NULL_LOGGING ); + } + + @SuppressWarnings( "unchecked" ) + private static PooledConnection acquireConnection( BlockingPooledConnectionQueue queue ) + { + queue.offer( newConnectionMock() ); + PooledConnection connection = queue.acquire( mock( Supplier.class ) ); + assertEquals( 1, queue.activeConnections() ); + return connection; + } + + private static PooledConnectionValidator newConnectionValidator( boolean allowsConnections ) + { + ConnectionPool pool = mock( ConnectionPool.class ); + when( pool.hasAddress( LOCAL_DEFAULT ) ).thenReturn( allowsConnections ); + return new PooledConnectionValidator( pool ); + } + + private static PooledConnection newConnectionMock() + { + PooledConnection connection = mock( PooledConnection.class ); + when( connection.boltServerAddress() ).thenReturn( LOCAL_DEFAULT ); + return connection; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java index d761981469..779ce4bf1b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java @@ -24,7 +24,9 @@ import org.mockito.stubbing.Answer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -42,11 +44,13 @@ import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.Logging; +import org.neo4j.driver.v1.Value; import static java.util.Collections.newSetFromMap; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; @@ -56,6 +60,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -64,6 +69,7 @@ import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.net.BoltServerAddress.DEFAULT_PORT; import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; +import static org.neo4j.driver.v1.Values.value; public class SocketConnectionPoolTest { @@ -489,6 +495,98 @@ public void acquireRetriesUntilAConnectionIsCreated() inOrder.verify( connection4, never() ).sync(); } + @Test + public void reportActiveConnectionsWhenEmpty() + { + SocketConnectionPool pool = newPool( newMockConnector() ); + + int activeConnections1 = pool.activeConnections( ADDRESS_1 ); + int activeConnections2 = pool.activeConnections( ADDRESS_2 ); + int activeConnections3 = pool.activeConnections( ADDRESS_3 ); + + assertEquals( 0, activeConnections1 ); + assertEquals( 0, activeConnections2 ); + assertEquals( 0, activeConnections3 ); + } + + @Test + public void reportActiveConnectionsWhenHasAcquiredConnections() + { + int acquiredConnections = 23; + SocketConnectionPool pool = newPool( newMockConnector() ); + + for ( int i = 0; i < acquiredConnections; i++ ) + { + assertNotNull( pool.acquire( ADDRESS_1 ) ); + } + + assertEquals( acquiredConnections, pool.activeConnections( ADDRESS_1 ) ); + } + + @Test + public void reportActiveConnectionsWhenHasIdleConnections() + { + Connection connection = newConnectionMock( ADDRESS_1 ); + Connector connector = newMockConnector( connection ); + SocketConnectionPool pool = newPool( connector ); + + PooledConnection connection1 = pool.acquire( ADDRESS_1 ); + PooledConnection connection2 = pool.acquire( ADDRESS_1 ); + + assertEquals( 2, pool.activeConnections( ADDRESS_1 ) ); + + connection1.close(); + connection2.close(); + + assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); + } + + @Test + public void shouldForgetBrokenIdleConnection() + { + Connection connection1 = newConnectionMock( ADDRESS_1 ); + Connection connection2 = newConnectionMock( ADDRESS_1 ); + + doNothing().doThrow( new RuntimeException() ).when( connection1 ).reset(); + + int idleTimeBeforeConnectionTest = 42; + FakeClock clock = new FakeClock(); + Connector connector = newMockConnector( connection1, connection2 ); + SocketConnectionPool pool = newPool( connector, clock, idleTimeBeforeConnectionTest ); + + // acquire and release one connection + pool.acquire( ADDRESS_1 ).close(); + // make this connection seem idle for too long + clock.progress( idleTimeBeforeConnectionTest + 42 ); + + PooledConnection acquiredConnection = pool.acquire( ADDRESS_1 ); + + Map auth = Collections.singletonMap( "Key", value( "Value" ) ); + acquiredConnection.init( "DummyClient", auth ); + verify( connection1, never() ).init( "DummyClient", auth ); + verify( connection2 ).init( "DummyClient", auth ); + + assertEquals( 1, pool.activeConnections( ADDRESS_1 ) ); + acquiredConnection.close(); + assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); + } + + @Test + public void shouldForgetIdleConnection() + { + Connection connection = newConnectionMock( ADDRESS_1 ); + doThrow( new RuntimeException() ).when( connection ).reset(); + + SocketConnectionPool pool = newPool( newMockConnector( connection ), new FakeClock(), 42 ); + PooledConnection pooledConnection = pool.acquire( ADDRESS_1 ); + + // release the connection, it should fail to reset and be disposed + pooledConnection.close(); + + assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); + verify( connection ).close(); + } + private static Answer createConnectionAnswer( final Set createdConnections ) { return new Answer()