From f024d8bcb4a98f5701e5f5ae82cf74b2b9ebd5f6 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 23 Jan 2018 13:40:27 +0100 Subject: [PATCH 1/2] Fixed issue with connection pool deactivation Previously load balancer moved connection pool to deactivated state when corresponding cluster member had a network error. This was made to disallow any new connections towards that member. Member has also been removed from the routing table, so that callers never try to acquire connection from a deactivated pool. Deactivated pools were re-activated during rediscovery. However, there was a case when deactivated pool towards the seed router would remain deactivated forever without a chance to be re-activated. It happened when connections to all cores failed and driver had to perform rediscovery using seed router. In this case all connections pools were deactivated, and rediscovery was not able to complete because it failed trying to obtain connection from a deactivated pool. This commit fixes the problem by removing pool activation/deactivation. Instead driver will simply make instance non-routable by removing it's address from the routing table. Corresponding connection pool will not be changed. Later rediscovery will cleanup pools for non-routable addresses that have no active connections. --- .../internal/cluster/ClusterRoutingTable.java | 3 +- .../driver/internal/cluster/LoadBalancer.java | 17 +- .../driver/internal/cluster/RoutingTable.java | 4 + .../BlockingPooledConnectionQueue.java | 36 +-- .../net/pooling/SocketConnectionPool.java | 57 ++-- .../internal/security/TLSSocketChannel.java | 2 + .../driver/internal/spi/ConnectionPool.java | 8 +- .../cluster/ClusterCompositionUtil.java | 4 +- .../cluster/ClusterRoutingTableTest.java | 23 ++ .../internal/cluster/LoadBalancerTest.java | 51 +++- ...tingPooledConnectionErrorHandlingTest.java | 3 +- .../BlockingPooledConnectionQueueTest.java | 142 --------- .../net/pooling/SocketConnectionPoolTest.java | 158 +++------- .../util/ConnectionTrackingDriverFactory.java | 12 +- .../v1/integration/CausalClusteringIT.java | 285 +++++++++++++++--- .../v1/util/cc/LocalOrRemoteClusterRule.java | 2 +- 16 files changed, 400 insertions(+), 407 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index c88d4083c4..8c7254440e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -64,7 +64,8 @@ public boolean isStaleFor( AccessMode mode ) mode == AccessMode.WRITE && writers.size() == 0; } - private Set servers() + @Override + public Set servers() { Set servers = new HashSet<>(); servers.addAll( readers.servers() ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java index 5679bf2ebf..273e6f0d3b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java @@ -110,16 +110,13 @@ private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSe private synchronized void forget( BoltServerAddress address ) { - // First remove from the load balancer, to prevent concurrent threads from making connections to them. + // remove from the routing table, to prevent concurrent threads from making connections to this address routingTable.forget( address ); + if ( PURGE_ON_ERROR ) { connections.purge( address ); } - else - { - connections.deactivate( address ); - } } synchronized void ensureRouting( AccessMode mode ) @@ -153,15 +150,7 @@ private void updateConnectionPool( RoutingTableChange routingTableChange ) } else { - for ( BoltServerAddress addedAddress : routingTableChange.added() ) - { - connections.activate( addedAddress ); - } - for ( BoltServerAddress removedAddress : routingTableChange.removed() ) - { - connections.deactivate( removedAddress ); - } - connections.compact(); + connections.retainAll( routingTable.servers() ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java index 0e60500011..c12c8668d0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.cluster; +import java.util.Set; + import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.v1.AccessMode; @@ -37,5 +39,7 @@ public interface RoutingTable int routerSize(); + Set servers(); + void removeWriter( BoltServerAddress toRemove ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java index dfe5a72042..7241dda4b5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java @@ -23,7 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.logging.DelegatingLogger; import org.neo4j.driver.internal.net.BoltServerAddress; @@ -40,15 +40,11 @@ public class BlockingPooledConnectionQueue { public static final String LOG_NAME = "ConnectionQueue"; - private static final int ACTIVE = 1; - private static final int INACTIVE = 2; - private static final int TERMINATED = 3; - /** The backing queue, keeps track of connections currently in queue */ private final BlockingQueue queue; private final Logger logger; - private final AtomicInteger state = new AtomicInteger( ACTIVE ); + private final AtomicBoolean terminated = new AtomicBoolean(); /** Keeps track of acquired connections */ private final Set acquiredConnections = @@ -75,7 +71,7 @@ public boolean offer( PooledConnection pooledConnection ) { disposeSafely( pooledConnection ); } - if ( state.get() != ACTIVE ) + if ( terminated.get() ) { terminateIdleConnections(); } @@ -96,13 +92,11 @@ public PooledConnection acquire( Supplier supplier ) } acquiredConnections.add( connection ); - int poolState = state.get(); - if ( poolState != ACTIVE ) + if ( terminated.get() ) { acquiredConnections.remove( connection ); disposeSafely( connection ); - throw new IllegalStateException( "Pool is " + (poolState == INACTIVE ? "deactivated" : "terminated") + - ", new connections can't be acquired" ); + throw new IllegalStateException( "Pool is terminated, new connections can't be acquired" ); } else { @@ -131,24 +125,6 @@ public boolean contains( PooledConnection pooledConnection ) return queue.contains( pooledConnection ); } - public void activate() - { - state.compareAndSet( INACTIVE, ACTIVE ); - } - - public void deactivate() - { - if ( state.compareAndSet( ACTIVE, INACTIVE ) ) - { - terminateIdleConnections(); - } - } - - public boolean isActive() - { - return state.get() == ACTIVE; - } - /** * Terminates all connections, both those that are currently in the queue as well * as those that have been acquired. @@ -157,7 +133,7 @@ public boolean isActive() */ public void terminate() { - if ( state.getAndSet( TERMINATED ) != TERMINATED ) + if ( terminated.compareAndSet( false, true ) ) { terminateIdleConnections(); terminateAcquiredConnections(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index c197c2ccf3..d9187c38e4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.internal.net.pooling; -import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -33,6 +32,7 @@ import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Supplier; +import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; /** @@ -61,6 +61,7 @@ public class SocketConnectionPool implements ConnectionPool private final ConnectionValidator connectionValidator; private final Clock clock; private final Logging logging; + private final Logger log; public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clock clock, Logging logging ) { @@ -69,6 +70,7 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo this.connectionValidator = new PooledConnectionValidator( this ); this.clock = clock; this.logging = logging; + this.log = logging.getLog( getClass().getSimpleName() ); } @Override @@ -93,37 +95,25 @@ public void purge( BoltServerAddress address ) } @Override - public void activate( BoltServerAddress address ) - { - BlockingPooledConnectionQueue connectionQueue = pools.get( address ); - if ( connectionQueue != null ) - { - connectionQueue.activate(); - } - } - - @Override - public void deactivate( BoltServerAddress address ) - { - BlockingPooledConnectionQueue connections = pools.get( address ); - if ( connections != null ) - { - connections.deactivate(); - } - } - - @Override - public void compact() + public void retainAll( Set addressesToRetain ) { for ( Map.Entry entry : pools.entrySet() ) { BoltServerAddress address = entry.getKey(); - BlockingPooledConnectionQueue queue = entry.getValue(); + BlockingPooledConnectionQueue pool = entry.getValue(); - if ( !queue.isActive() && queue.activeConnections() == 0 ) + if ( !addressesToRetain.contains( address ) && pool.activeConnections() == 0 ) { - // queue has been in deactivated state and has no open connections by now - pools.remove( address ); + // address is not present in the updated routing table and has no active connections + // it's now safe to terminate corresponding connection pool and forget about it + + BlockingPooledConnectionQueue removedPool = pools.remove( address ); + if ( removedPool != null ) + { + log.info( "Closing connection pool towards %s, it has no active connections " + + "and is not in the routing table", address ); + removedPool.terminate(); + } } } } @@ -131,8 +121,7 @@ public void compact() @Override public boolean hasAddress( BoltServerAddress address ) { - BlockingPooledConnectionQueue connectionQueue = pools.get( address ); - return connectionQueue != null && connectionQueue.isActive(); + return pools.containsKey( address ); } @Override @@ -152,17 +141,7 @@ public void close() public int activeConnections( BoltServerAddress address ) { BlockingPooledConnectionQueue connectionQueue = pools.get( address ); - if ( connectionQueue == null || !connectionQueue.isActive() ) - { - return 0; - } - return connectionQueue.activeConnections(); - } - - // test-only accessor - Set addresses() - { - return Collections.unmodifiableSet( pools.keySet() ); + return connectionQueue == null ? 0 : connectionQueue.activeConnections(); } private BlockingPooledConnectionQueue pool( BoltServerAddress address ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/security/TLSSocketChannel.java b/driver/src/main/java/org/neo4j/driver/internal/security/TLSSocketChannel.java index 6886d45fe8..abfca9fe02 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/security/TLSSocketChannel.java +++ b/driver/src/main/java/org/neo4j/driver/internal/security/TLSSocketChannel.java @@ -372,6 +372,8 @@ private HandshakeStatus wrap( ByteBuffer buffer ) throws IOException, ClientExce cipherOut.compact(); } break; + case CLOSED: + throw new IOException( "TLS socket channel is closed" ); default: throw new ClientException( "Got unexpected status " + status ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java index 2efd55e9d4..3481d0d677 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.spi; +import java.util.Set; + import org.neo4j.driver.internal.net.BoltServerAddress; public interface ConnectionPool extends AutoCloseable @@ -36,11 +38,7 @@ public interface ConnectionPool extends AutoCloseable */ void purge( BoltServerAddress address ); - void activate( BoltServerAddress address ); - - void deactivate( BoltServerAddress address ); - - void compact(); + void retainAll( Set addressesToRetain ); boolean hasAddress( BoltServerAddress address ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java index 9cc091ce7a..6d5afa1a37 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java @@ -18,7 +18,7 @@ */ package org.neo4j.driver.internal.cluster; -import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -41,7 +41,7 @@ private ClusterCompositionUtil() {} public static final BoltServerAddress E = new BoltServerAddress( "5555:55" ); public static final BoltServerAddress F = new BoltServerAddress( "6666:66" ); - public static final List EMPTY = new ArrayList<>(); + public static final List EMPTY = Collections.emptyList(); public static final ClusterComposition VALID_CLUSTER_COMPOSITION = createClusterComposition( asList( A, B ), asList( C ), asList( D, E ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java index 0c39d63a97..b51ca8cc82 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java @@ -20,7 +20,9 @@ import org.junit.Test; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.util.FakeClock; @@ -231,4 +233,25 @@ public void shouldNotRemoveServerIfPreWriterNowReader() assertEquals( 2, change.removed().size() ); assertThat( change.removed(), containsInAnyOrder( A, C ) ); } + + @Test + public void shouldReturnNoServersWhenEmpty() + { + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() ); + + Set servers = routingTable.servers(); + + assertEquals( 0, servers.size() ); + } + + @Test + public void shouldReturnAllServers() + { + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() ); + routingTable.update( createClusterComposition( asList( A, B, C ), asList( B, C, D ), asList( C, D, E, F ) ) ); + + Set servers = routingTable.servers(); + + assertEquals( new HashSet<>( asList( A, B, C, D, E, F ) ), servers ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index cab63b966a..1db51b93dd 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -36,6 +36,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.spi.PooledConnection; +import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.internal.util.SleeplessClock; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Session; @@ -44,6 +45,7 @@ import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.hamcrest.Matchers.instanceOf; @@ -62,6 +64,14 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.A; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.B; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.C; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.D; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.E; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.EMPTY; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.F; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.createClusterComposition; import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; @@ -71,7 +81,7 @@ public class LoadBalancerTest { @Test - public void ensureRoutingShouldUpdateRoutingTableAndDeactivateConnectionPoolWhenStale() throws Exception + public void ensureRoutingShouldUpdateRoutingTableWhenStale() throws Exception { // given ConnectionPool conns = mock( ConnectionPool.class ); @@ -89,7 +99,6 @@ public void ensureRoutingShouldUpdateRoutingTableAndDeactivateConnectionPoolWhen InOrder inOrder = inOrder( rediscovery, routingTable, conns ); inOrder.verify( rediscovery ).lookupClusterComposition( routingTable, conns ); inOrder.verify( routingTable ).update( any( ClusterComposition.class ) ); - inOrder.verify( conns ).deactivate( new BoltServerAddress( "abc", 12 ) ); } @Test @@ -147,7 +156,7 @@ public void shouldAcquireReaderOrWriterConn() throws Exception } @Test - public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosingTx() + public void shouldForgetAddressOnServiceUnavailableWhileClosingTx() { RoutingTable routingTable = mock( RoutingTable.class ); when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( RoutingTableChange.EMPTY ); @@ -172,11 +181,10 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing } verify( routingTable ).forget( address ); - verify( connectionPool ).deactivate( address ); } @Test - public void shouldForgetAddressAndItsIdleConnectionsOnServiceUnavailableWhileClosingSession() + public void shouldForgetAddressOnServiceUnavailableWhileClosingSession() { RoutingTable routingTable = mock( RoutingTable.class, RETURNS_MOCKS ); ConnectionPool connectionPool = mock( ConnectionPool.class ); @@ -193,7 +201,6 @@ public void shouldForgetAddressAndItsIdleConnectionsOnServiceUnavailableWhileClo session.close(); verify( routingTable ).forget( address ); - verify( connectionPool ).deactivate( address ); } @Test @@ -256,6 +263,27 @@ public void shouldThrowWhenRediscoveryReturnsNoSuitableServers() } } + @Test + public void shouldRetainConnectionsToAllAddressesFromReceivedRoutingTable() + { + ConnectionPool connections = mock( ConnectionPool.class ); + when( connections.acquire( LOCAL_DEFAULT ) ).thenReturn( mock( PooledConnection.class ) ); + + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock(), A ); + // initally a stale routing table without readers + ClusterComposition initalComposition = createClusterComposition( asList( A, B ), asList( B, C ), EMPTY ); + // then valid routing table with everything + ClusterComposition nextComposition = createClusterComposition( asList( A, B, C ), asList( B, C, D ), asList( C, D, E, F ) ); + Rediscovery rediscovery = newRediscoveryMock( initalComposition, nextComposition ); + + LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER ); + loadBalancer.acquireConnection( READ ); // requires rediscovery + + // rediscovery should be performed when load balancer is created and on read connection acquisition + verify( rediscovery, times( 2 ) ).lookupClusterComposition( routingTable, connections ); + verify( connections ).retainAll( new HashSet( asList( A, B, C, D, E, F ) ) ); + } + private void testRediscoveryWhenStale( AccessMode mode ) { ConnectionPool connections = mock( ConnectionPool.class ); @@ -350,11 +378,16 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) private static Rediscovery newRediscoveryMock() { - Rediscovery rediscovery = mock( Rediscovery.class ); - Set noServers = Collections.emptySet(); + Set noServers = Collections.emptySet(); ClusterComposition clusterComposition = new ClusterComposition( 1, noServers, noServers, noServers ); + return newRediscoveryMock( clusterComposition ); + } + + private static Rediscovery newRediscoveryMock( ClusterComposition initialComposition, ClusterComposition... otherCompositions ) + { + Rediscovery rediscovery = mock( Rediscovery.class ); when( rediscovery.lookupClusterComposition( any( RoutingTable.class ), any( ConnectionPool.class ) ) ) - .thenReturn( clusterComposition ); + .thenReturn( initialComposition, otherCompositions ); return rediscovery; } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java index 932359fe0e..14d65be2cb 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java @@ -46,7 +46,6 @@ import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -232,7 +231,7 @@ private void verifyServiceUnavailableHandling( Connection connection, RoutingTab assertThat( routingTable, not( containsRouter( address ) ) ); assertThat( routingTable, not( containsReader( address ) ) ); assertThat( routingTable, not( containsWriter( address ) ) ); - assertFalse( connectionPool.hasAddress( address ) ); + assertTrue( connectionPool.hasAddress( address ) ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java index 665b24a69f..264785f602 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java @@ -28,14 +28,11 @@ import org.neo4j.driver.v1.Logging; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_MOCKS; @@ -302,139 +299,6 @@ public void shouldDisposeBrokenConnections() verify( connection ).dispose(); } - @Test - public void shouldFailToAcquireConnectionWhenDeactivated() - { - Supplier connectionSupplier = connectionSupplierMock(); - when( connectionSupplier.get() ).thenReturn( mock( PooledConnection.class ) ); - BlockingPooledConnectionQueue queue = newConnectionQueue( 3 ); - queue.deactivate(); - - try - { - queue.acquire( connectionSupplier ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e.getMessage(), startsWith( "Pool is deactivated" ) ); - } - } - - @Test - public void shouldTerminateOfferedConnectionWhenDeactivated() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 3 ); - queue.deactivate(); - - PooledConnection connection = mock( PooledConnection.class ); - queue.offer( connection ); - - verify( connection ).dispose(); - } - - @Test - public void shouldBeActiveWhenNotDeactivatedAndNotTerminated() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); - assertTrue( queue.isActive() ); - } - - @Test - public void shouldNotBeActiveWhenDeactivated() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); - assertTrue( queue.isActive() ); - queue.deactivate(); - assertFalse( queue.isActive() ); - } - - @Test - public void shouldNotBeActiveWhenTerminated() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); - assertTrue( queue.isActive() ); - queue.terminate(); - assertFalse( queue.isActive() ); - } - - @Test - public void shouldBeActiveAfterDeactivationAndActivation() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); - assertTrue( queue.isActive() ); - queue.deactivate(); - assertFalse( queue.isActive() ); - queue.activate(); - assertTrue( queue.isActive() ); - } - - @Test - public void shouldNotBeActiveAfterTerminationAndActivation() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); - assertTrue( queue.isActive() ); - queue.terminate(); - assertFalse( queue.isActive() ); - queue.activate(); - assertFalse( queue.isActive() ); - } - - @Test - public void shouldBePossibleToAcquireFromActivatedQueue() - { - Supplier connectionSupplier = connectionSupplierMock(); - when( connectionSupplier.get() ).thenReturn( mock( PooledConnection.class ) ); - BlockingPooledConnectionQueue queue = newConnectionQueue( 3 ); - queue.deactivate(); - - try - { - queue.acquire( connectionSupplier ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e.getMessage(), startsWith( "Pool is deactivated" ) ); - } - - queue.activate(); - - assertNotNull( queue.acquire( connectionSupplier ) ); - } - - @Test - public void shouldNotBePossibleToActivateTerminatedQueue() - { - Supplier connectionSupplier = connectionSupplierMock(); - when( connectionSupplier.get() ).thenReturn( mock( PooledConnection.class ) ); - BlockingPooledConnectionQueue queue = newConnectionQueue( 3 ); - queue.terminate(); - - try - { - queue.acquire( connectionSupplier ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e.getMessage(), startsWith( "Pool is terminated" ) ); - } - - queue.activate(); - - try - { - queue.acquire( connectionSupplier ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e.getMessage(), startsWith( "Pool is terminated" ) ); - } - assertFalse( queue.isActive() ); - } - private static BlockingPooledConnectionQueue newConnectionQueue( int capacity ) { return newConnectionQueue( capacity, mock( Logging.class, RETURNS_MOCKS ) ); @@ -444,10 +308,4 @@ private static BlockingPooledConnectionQueue newConnectionQueue( int capacity, L { return new BlockingPooledConnectionQueue( LOCAL_DEFAULT, capacity, logging ); } - - @SuppressWarnings( "unchecked" ) - private static Supplier connectionSupplierMock() - { - return mock( Supplier.class ); - } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java index 7144cf9538..9fe6ff7d18 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java @@ -23,11 +23,14 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -47,7 +50,6 @@ import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Value; -import static java.util.Arrays.asList; import static java.util.Collections.newSetFromMap; import static java.util.Collections.singleton; import static org.hamcrest.Matchers.instanceOf; @@ -79,6 +81,7 @@ public class SocketConnectionPoolTest private static final BoltServerAddress ADDRESS_1 = LOCAL_DEFAULT; private static final BoltServerAddress ADDRESS_2 = new BoltServerAddress( "localhost", DEFAULT_PORT + 42 ); private static final BoltServerAddress ADDRESS_3 = new BoltServerAddress( "localhost", DEFAULT_PORT + 4242 ); + private static final BoltServerAddress ADDRESS_4 = new BoltServerAddress( "localhost", DEFAULT_PORT + 424242 ); @Test public void acquireCreatesNewConnectionWhenPoolIsEmpty() @@ -110,55 +113,6 @@ public void acquireUsesExistingConnectionIfPresent() verify( connector ).connect( ADDRESS_1 ); } - @Test - public void deactivateDoesNothingForNonExistingAddress() - { - Connection connection = newConnectionMock( ADDRESS_1 ); - SocketConnectionPool pool = newPool( newMockConnector( connection ) ); - - pool.acquire( ADDRESS_1 ).close(); - - assertTrue( pool.hasAddress( ADDRESS_1 ) ); - pool.deactivate( ADDRESS_2 ); - assertTrue( pool.hasAddress( ADDRESS_1 ) ); - } - - @Test - public void deactivateRemovesAddress() - { - Connection connection = newConnectionMock( ADDRESS_1 ); - SocketConnectionPool pool = newPool( newMockConnector( connection ) ); - - pool.acquire( ADDRESS_1 ).close(); - - assertTrue( pool.hasAddress( ADDRESS_1 ) ); - pool.deactivate( ADDRESS_1 ); - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - } - - @Test - public void deactivateTerminatesIdleConnectionsInThePoolCorrespondingToTheAddress() - { - Connection connection1 = newConnectionMock( ADDRESS_1 ); - Connection connection2 = newConnectionMock( ADDRESS_1 ); - Connection connection3 = newConnectionMock( ADDRESS_1 ); - SocketConnectionPool pool = newPool( newMockConnector( connection1, connection2, connection3 ) ); - - Connection pooledConnection1 = pool.acquire( ADDRESS_1 ); - Connection pooledConnection2 = pool.acquire( ADDRESS_1 ); - pool.acquire( ADDRESS_1 ); - - // return two connections to the pool - pooledConnection1.close(); - pooledConnection2.close(); - - pool.deactivate( ADDRESS_1 ); - - verify( connection1 ).close(); - verify( connection2 ).close(); - verify( connection3, never() ).close(); - } - @Test public void hasAddressReturnsFalseWhenPoolIsEmpty() { @@ -591,85 +545,37 @@ public void shouldForgetIdleConnection() } @Test - public void shouldDeactivateExistingPool() + public void shouldRetainAllGivenAddresses() { - SocketConnectionPool pool = newPool( newMockConnector() ); - - assertNotNull( pool.acquire( ADDRESS_1 ) ); - assertTrue( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 1, pool.activeConnections( ADDRESS_1 ) ); - - pool.deactivate( ADDRESS_1 ); - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); - } - - @Test - public void shouldDeactivateNothingWhenPoolDoesNotExist() - { - SocketConnectionPool pool = newPool( newMockConnector() ); - - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); - - pool.deactivate( ADDRESS_1 ); - - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); - } + SocketConnectionPool pool = newPool( newMockConnector(), new FakeClock(), 42 ); - @Test - public void shouldActivateExistingPool() - { - SocketConnectionPool pool = newPool( newMockConnector() ); - assertNotNull( pool.acquire( ADDRESS_1 ) ); + pool.acquire( ADDRESS_1 ).close(); + pool.acquire( ADDRESS_2 ).close(); + pool.acquire( ADDRESS_3 ); - pool.deactivate( ADDRESS_1 ); - pool.activate( ADDRESS_1 ); + pool.retainAll( new HashSet( Arrays.asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ) ); assertTrue( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 1, pool.activeConnections( ADDRESS_1 ) ); - } - - @Test - public void shouldActivateNothingWhenPoolDoesNotExist() - { - SocketConnectionPool pool = newPool( newMockConnector() ); - - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); - - pool.activate( ADDRESS_1 ); - - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); + assertTrue( pool.hasAddress( ADDRESS_2 ) ); + assertTrue( pool.hasAddress( ADDRESS_3 ) ); } @Test - public void shouldRemoveDeactivatedPoolsWithoutConnectionsWhenCompacting() + public void shouldRemoveAllNonRetainedAddressesWithoutActiveConnections() { - SocketConnectionPool pool = newPool( newMockConnector( newConnectionMock( ADDRESS_1 ), - newConnectionMock( ADDRESS_1 ), newConnectionMock( ADDRESS_2 ), newConnectionMock( ADDRESS_3 ) ) ); - - PooledConnection connection1 = pool.acquire( ADDRESS_1 ); - PooledConnection connection2 = pool.acquire( ADDRESS_1 ); - PooledConnection connection3 = pool.acquire( ADDRESS_2 ); - PooledConnection connection4 = pool.acquire( ADDRESS_3 ); + SocketConnectionPool pool = newPool( newMockConnector(), new FakeClock(), 42 ); - assertEquals( new HashSet<>( asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ), pool.addresses() ); - - pool.deactivate( ADDRESS_1 ); - pool.deactivate( ADDRESS_3 ); - - connection1.close(); - connection2.close(); - connection4.close(); - - assertEquals( new HashSet<>( asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ), pool.addresses() ); + pool.acquire( ADDRESS_1 ).close(); + pool.acquire( ADDRESS_2 ).close(); + pool.acquire( ADDRESS_3 ); + pool.acquire( ADDRESS_4 ).close(); - pool.compact(); + pool.retainAll( singleton( ADDRESS_1 ) ); - assertEquals( singleton( ADDRESS_2 ), pool.addresses() ); + assertTrue( pool.hasAddress( ADDRESS_1 ) ); + assertFalse( pool.hasAddress( ADDRESS_2 ) ); + assertTrue( pool.hasAddress( ADDRESS_3 ) ); + assertFalse( pool.hasAddress( ADDRESS_4 ) ); } private static Answer createConnectionAnswer( final Set createdConnections ) @@ -711,7 +617,23 @@ private static Connector newMockConnector() private static Connector newMockConnector( Connection connection, Connection... otherConnections ) { Connector connector = mock( Connector.class ); - when( connector.connect( any( BoltServerAddress.class ) ) ).thenReturn( connection, otherConnections ); + + final Queue connectionsToReturn = new ArrayDeque<>(); + connectionsToReturn.add( connection ); + Collections.addAll( connectionsToReturn, otherConnections ); + + when( connector.connect( any( BoltServerAddress.class ) ) ).thenAnswer( new Answer() + { + @Override + public Connection answer( InvocationOnMock invocation ) throws Throwable + { + BoltServerAddress address = invocation.getArgumentAt( 0, BoltServerAddress.class ); + Connection connectionToReturn = connectionsToReturn.size() == 1 ? connectionsToReturn.peek() : connectionsToReturn.poll(); + when( connectionToReturn.boltServerAddress() ).thenReturn( address ); + return connectionToReturn; + } + } ); + return connector; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java index da20f1aecb..c093cd177f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java @@ -19,6 +19,7 @@ package org.neo4j.driver.internal.util; import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -30,8 +31,7 @@ public class ConnectionTrackingDriverFactory extends DriverFactoryWithClock { - private final Set connections = - Collections.newSetFromMap( new ConcurrentHashMap() ); + private final Set connections = Collections.newSetFromMap( new ConcurrentHashMap() ); public ConnectionTrackingDriverFactory( Clock clock ) { @@ -39,8 +39,7 @@ public ConnectionTrackingDriverFactory( Clock clock ) } @Override - protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan, - Logging logging ) + protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging ) { Connector connector = super.createConnector( connectionSettings, securityPlan, logging ); return new ConnectionTrackingConnector( connector, connections ); @@ -48,10 +47,11 @@ protected Connector createConnector( ConnectionSettings connectionSettings, Secu public void closeConnections() { - for ( Connection connection : connections ) + Set connectionsSnapshot = new HashSet<>( connections ); + connections.clear(); + for ( Connection connection : connectionsSnapshot ) { connection.close(); } - connections.clear(); } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index c8e0e256af..e04f52d023 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.v1.integration; +import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -29,12 +30,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.cluster.RoutingSettings; -import org.neo4j.driver.internal.logging.DevNullLogger; import org.neo4j.driver.internal.retry.RetrySettings; +import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.ConnectionTrackingDriverFactory; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.internal.util.ThrowingConnection; @@ -44,11 +45,10 @@ import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; -import org.neo4j.driver.v1.Logger; -import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.StatementRunner; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.Values; @@ -62,17 +62,22 @@ import org.neo4j.driver.v1.util.cc.ClusterMemberRole; import org.neo4j.driver.v1.util.cc.ClusterRule; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.v1.Values.parameters; +import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon; public class CausalClusteringIT { @@ -81,6 +86,17 @@ public class CausalClusteringIT @Rule public final ClusterRule clusterRule = new ClusterRule(); + private ExecutorService executor; + + @After + public void tearDown() + { + if ( executor != null ) + { + executor.shutdownNow(); + } + } + @Test public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader() throws Exception { @@ -228,7 +244,7 @@ public void shouldDropBrokenOldSessions() throws Exception int livenessCheckTimeoutMinutes = 2; Config config = Config.build() - .withConnectionLivenessCheckTimeout( livenessCheckTimeoutMinutes, TimeUnit.MINUTES ) + .withConnectionLivenessCheckTimeout( livenessCheckTimeoutMinutes, MINUTES ) .withoutEncryption() .toConfig(); @@ -237,10 +253,9 @@ public void shouldDropBrokenOldSessions() throws Exception URI routingUri = cluster.leader().getRoutingUri(); AuthToken auth = clusterRule.getDefaultAuthToken(); - RoutingSettings routingSettings = new RoutingSettings( 1, SECONDS.toMillis( 5 ), null ); RetrySettings retrySettings = RetrySettings.DEFAULT; - try ( Driver driver = driverFactory.newInstance( routingUri, auth, routingSettings, retrySettings, config ) ) + try ( Driver driver = driverFactory.newInstance( routingUri, auth, defaultRoutingSettings(), retrySettings, config ) ) { // create nodes in different threads using different sessions createNodesInDifferentThreads( concurrentSessionsCount, driver ); @@ -248,7 +263,7 @@ public void shouldDropBrokenOldSessions() throws Exception // now pool contains many sessions, make them all invalid driverFactory.closeConnections(); // move clock forward more than configured liveness check timeout - clock.progress( TimeUnit.MINUTES.toMillis( livenessCheckTimeoutMinutes + 1 ) ); + clock.progress( MINUTES.toMillis( livenessCheckTimeoutMinutes + 1 ) ); // now all idle connections should be considered too old and will be verified during acquisition // they will appear broken because they were closed and new valid connection will be created @@ -464,7 +479,7 @@ public void shouldAcceptMultipleBookmarks() throws Exception Cluster cluster = clusterRule.getCluster(); ClusterMember leader = cluster.leader(); - ExecutorService executor = Executors.newCachedThreadPool(); + executor = newExecutor(); try ( Driver driver = createDriver( leader.getRoutingUri() ) ) { @@ -486,7 +501,7 @@ public void shouldAcceptMultipleBookmarks() throws Exception try ( Session session = driver.session( AccessMode.READ, bookmarks ) ) { int count = countNodes( session, label, property, value ); - assertEquals( count, threadCount ); + assertEquals( threadCount, count ); } } } @@ -498,11 +513,8 @@ public void shouldAllowExistingTransactionToCompleteAfterDifferentConnectionBrea ClusterMember leader = cluster.leader(); ThrowingConnectionDriverFactory driverFactory = new ThrowingConnectionDriverFactory(); - RoutingSettings routingSettings = new RoutingSettings( 1, SECONDS.toMillis( 5 ), null ); - Config config = Config.build().toConfig(); - try ( Driver driver = driverFactory.newInstance( leader.getRoutingUri(), clusterRule.getDefaultAuthToken(), - routingSettings, RetrySettings.DEFAULT, config ) ) + defaultRoutingSettings(), RetrySettings.DEFAULT, configWithoutLogging() ) ) { Session session1 = driver.session(); Transaction tx1 = session1.beginTransaction(); @@ -536,6 +548,111 @@ public void shouldAllowExistingTransactionToCompleteAfterDifferentConnectionBrea } } + @Test + public void shouldRediscoverWhenConnectionsToAllCoresBreak() + { + Cluster cluster = clusterRule.getCluster(); + ClusterMember leader = cluster.leader(); + + ThrowingConnectionDriverFactory driverFactory = new ThrowingConnectionDriverFactory(); + try ( Driver driver = driverFactory.newInstance( leader.getRoutingUri(), clusterRule.getDefaultAuthToken(), + defaultRoutingSettings(), RetrySettings.DEFAULT, configWithoutLogging() ) ) + { + try ( Session session = driver.session() ) + { + createNode( session, "Person", "name", "Vision" ); + + // force driver to connect to every cluster member + for ( int i = 0; i < cluster.members().size(); i++ ) + { + assertEquals( 1, countNodes( session, "Person", "name", "Vision" ) ); + } + } + + // now driver should have connection pools towards every cluster member + // make all those connections throw and seem broken + for ( ThrowingConnection connection : driverFactory.getConnections() ) + { + connection.setNextRunFailure( new ServiceUnavailableException( "Disconnected" ) ); + } + + // observe that connection towards writer is broken + try ( Session session = driver.session( AccessMode.WRITE ) ) + { + try + { + runCreateNode( session, "Person", "name", "Vision" ); + fail( "Exception expected" ); + } + catch ( SessionExpiredException e ) + { + assertEquals( "Disconnected", e.getCause().getMessage() ); + } + } + + // probe connections to all readers + int readersCount = cluster.followers().size() + cluster.readReplicas().size(); + for ( int i = 0; i < readersCount; i++ ) + { + try ( Session session = driver.session( AccessMode.READ ) ) + { + runCountNodes( session, "Person", "name", "Vision" ); + } + catch ( Throwable ignore ) + { + } + } + + try ( Session session = driver.session() ) + { + updateNode( session, "Person", "name", "Vision", "Thanos" ); + assertEquals( 0, countNodes( session, "Person", "name", "Vision" ) ); + assertEquals( 1, countNodes( session, "Person", "name", "Thanos" ) ); + } + } + } + + @Test + public void shouldKeepOperatingWhenConnectionsBreak() throws Exception + { + String label = "Person"; + String property = "name"; + String value = "Tony Stark"; + Cluster cluster = clusterRule.getCluster(); + + ConnectionTrackingDriverFactory driverFactory = new ConnectionTrackingDriverFactory( Clock.SYSTEM ); + AtomicBoolean stop = new AtomicBoolean(); + executor = newExecutor(); + + try ( Driver driver = driverFactory.newInstance( cluster.leader().getRoutingUri(), clusterRule.getDefaultAuthToken(), + defaultRoutingSettings(), RetrySettings.DEFAULT, configWithoutLogging() ) ) + { + List> results = new ArrayList<>(); + + // launch writers and readers that use transaction functions and thus should never fail + for ( int i = 0; i < 3; i++ ) + { + results.add( executor.submit( countNodesCallable( driver, label, property, value, stop ) ) ); + } + for ( int i = 0; i < 2; i++ ) + { + results.add( executor.submit( createNodesCallable( driver, label, property, value, stop ) ) ); + } + + // terminate connections while reads and writes are in progress + long deadline = System.currentTimeMillis() + MINUTES.toMillis( 1 ); + while ( System.currentTimeMillis() < deadline && !stop.get() ) + { + driverFactory.closeConnections(); + SECONDS.sleep( 5 ); // sleep a bit to allow readers and writers to progress + } + stop.set( true ); + + awaitAll( results ); // readers and writers should stop + assertThat( countNodes( driver.session(), label, property, value ), greaterThan( 0 ) ); // some nodes should be created + } + } + private static void closeTx( Transaction tx ) { tx.success(); @@ -713,36 +830,19 @@ else if ( role == ClusterMemberRole.READ_REPLICA ) private Driver createDriver( URI boltUri ) { - Logging devNullLogging = new Logging() - { - @Override - public Logger getLog( String name ) - { - return DevNullLogger.DEV_NULL_LOGGER; - } - }; - - Config config = Config.build() - .withLogging( devNullLogging ) - .toConfig(); - - return GraphDatabase.driver( boltUri, clusterRule.getDefaultAuthToken(), config ); + return GraphDatabase.driver( boltUri, clusterRule.getDefaultAuthToken(), configWithoutLogging() ); } private Driver discoverDriver( List routingUris ) { - Config config = Config.build() - .withLogging( DEV_NULL_LOGGING ) - .toConfig(); - - return GraphDatabase.routingDriver( routingUris, clusterRule.getDefaultAuthToken(), config ); + return GraphDatabase.routingDriver( routingUris, clusterRule.getDefaultAuthToken(), configWithoutLogging() ); } private static void createNodesInDifferentThreads( int count, final Driver driver ) throws Exception { final CountDownLatch beforeRunLatch = new CountDownLatch( count ); final CountDownLatch runQueryLatch = new CountDownLatch( 1 ); - final ExecutorService executor = Executors.newCachedThreadPool(); + final ExecutorService executor = newExecutor(); for ( int i = 0; i < count; i++ ) { @@ -766,7 +866,7 @@ public Void call() throws Exception runQueryLatch.countDown(); executor.shutdown(); - assertTrue( executor.awaitTermination( 1, TimeUnit.MINUTES ) ); + assertTrue( executor.awaitTermination( 1, MINUTES ) ); } private static void closeAndExpectException( AutoCloseable closeable, Class exceptionClass ) @@ -789,9 +889,7 @@ private static int countNodes( Session session, final String label, final String @Override public Integer execute( Transaction tx ) { - StatementResult result = tx.run( "MATCH (n:" + label + " {" + property + ": $value}) RETURN count(n)", - parameters( "value", value ) ); - return result.single().get( 0 ).asInt(); + return runCountNodes( tx, label, property, value ); } } ); } @@ -827,4 +925,115 @@ public Void execute( Transaction tx ) return localSession.lastBookmark(); } } + + private static Callable createNodesCallable( final Driver driver, final String label, final String property, final String value, + final AtomicBoolean stop ) + { + return new Callable() + { + @Override + public Void call() throws Exception + { + while ( !stop.get() ) + { + try ( Session session = driver.session( AccessMode.WRITE ) ) + { + createNode( session, label, property, value ); + } + catch ( Throwable t ) + { + stop.set( true ); + throw t; + } + } + return null; + } + }; + } + + private static Callable countNodesCallable( final Driver driver, final String label, final String property, final String value, + final AtomicBoolean stop ) + { + return new Callable() + { + @Override + public Void call() throws Exception + { + while ( !stop.get() ) + { + try ( Session session = driver.session( AccessMode.READ ) ) + { + countNodes( session, label, property, value ); + } + catch ( Throwable t ) + { + stop.set( true ); + throw t; + } + } + return null; + } + }; + } + + private static void createNode( final Session session, final String label, final String property, final String value ) + { + session.writeTransaction( new TransactionWork() + { + @Override + public Void execute( Transaction tx ) + { + runCreateNode( tx, label, property, value ); + return null; + } + } ); + } + + private static void updateNode( final Session session, final String label, final String property, final String oldValue, final String newValue ) + { + session.writeTransaction( new TransactionWork() + { + @Override + public Void execute( Transaction tx ) + { + tx.run( "MATCH (n: " + label + '{' + property + ": $oldValue}) SET n." + property + " = $newValue", + parameters( "oldValue", oldValue, "newValue", newValue ) ); + return null; + } + } ); + } + + private static void runCreateNode( StatementRunner statementRunner, String label, String property, String value ) + { + statementRunner.run( "CREATE (n:" + label + ") SET n." + property + " = $value", parameters( "value", value ) ); + } + + private static int runCountNodes( StatementRunner statementRunner, String label, String property, String value ) + { + StatementResult result = statementRunner.run( "MATCH (n:" + label + " {" + property + ": $value}) RETURN count(n)", parameters( "value", value ) ); + return result.single().get( 0 ).asInt(); + } + + private static RoutingSettings defaultRoutingSettings() + { + return new RoutingSettings( 1, SECONDS.toMillis( 1 ), null ); + } + + private static Config configWithoutLogging() + { + return Config.build().withLogging( DEV_NULL_LOGGING ).toConfig(); + } + + private static ExecutorService newExecutor() + { + return Executors.newCachedThreadPool( daemon( CausalClusteringIT.class.getSimpleName() + "-thread-" ) ); + } + + private static void awaitAll( List> results ) throws Exception + { + for ( Future result : results ) + { + assertNull( result.get( DEFAULT_TIMEOUT_MS, MILLISECONDS ) ); + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/LocalOrRemoteClusterRule.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/LocalOrRemoteClusterRule.java index bc21acfb81..e41a4b20f8 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/LocalOrRemoteClusterRule.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/LocalOrRemoteClusterRule.java @@ -92,7 +92,7 @@ private static void assertValidSystemPropertiesDefined() } if ( uri != null && !BOLT_ROUTING_URI_SCHEME.equals( uri.getScheme() ) ) { - throw new IllegalStateException( "CLuster uri should have bolt+routing scheme: '" + uri + "'" ); + throw new IllegalStateException( "Cluster uri should have bolt+routing scheme: '" + uri + "'" ); } } From 8154bdcd85b444e72dc364408c02004a1fc2cec6 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 24 Jan 2018 10:37:27 +0100 Subject: [PATCH 2/2] Do not use `/tmp` for tests --- .../internal/security/TrustOnFirstUseTrustManagerTest.java | 2 +- .../java/org/neo4j/driver/v1/integration/CredentialsIT.java | 3 ++- .../org/neo4j/driver/v1/integration/TLSSocketChannelIT.java | 2 +- .../java/org/neo4j/driver/v1/tck/DriverComplianceIT.java | 5 ----- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/driver/src/test/java/org/neo4j/driver/internal/security/TrustOnFirstUseTrustManagerTest.java b/driver/src/test/java/org/neo4j/driver/internal/security/TrustOnFirstUseTrustManagerTest.java index 7369434906..2c98d225f2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/security/TrustOnFirstUseTrustManagerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/security/TrustOnFirstUseTrustManagerTest.java @@ -54,7 +54,7 @@ public class TrustOnFirstUseTrustManagerTest private String knownServer; @Rule - public TemporaryFolder testDir = new TemporaryFolder(); + public TemporaryFolder testDir = new TemporaryFolder( new File( "target" ) ); private X509Certificate knownCertificate; @Before diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java index 2297c4013f..c8992cdd73 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java @@ -23,6 +23,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; import java.util.HashMap; import org.neo4j.driver.internal.security.InternalAuthToken; @@ -52,7 +53,7 @@ public class CredentialsIT { @ClassRule - public static TemporaryFolder tempDir = new TemporaryFolder(); + public static TemporaryFolder tempDir = new TemporaryFolder( new File( "target" ) ); @ClassRule public static TestNeo4j neo4j = new TestNeo4j(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TLSSocketChannelIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TLSSocketChannelIT.java index 64ad23eed5..4b9c691be0 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TLSSocketChannelIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TLSSocketChannelIT.java @@ -66,7 +66,7 @@ public class TLSSocketChannelIT public TestNeo4j neo4j = new TestNeo4j(); @Rule - public TemporaryFolder folder = new TemporaryFolder(); + public TemporaryFolder folder = new TemporaryFolder( new File( "target" ) ); @BeforeClass public static void setup() throws IOException, InterruptedException diff --git a/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java b/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java index 3d573cb4ca..d7fd766931 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java @@ -20,8 +20,6 @@ import cucumber.api.CucumberOptions; import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import java.io.IOException; @@ -36,9 +34,6 @@ format = {"default_summary"}) public class DriverComplianceIT { - @Rule - TemporaryFolder folder = new TemporaryFolder(); - @ClassRule public static TestNeo4j neo4j = new TestNeo4j();