diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index 5103ca3534..cdbf69962e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -43,8 +43,7 @@ public class ClusterRoutingTable implements RoutingTable public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses ) { this( clock ); - routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet(), - new HashSet() ); + routers.update( new LinkedHashSet<>( asList( routingAddresses ) )); } private ClusterRoutingTable( Clock clock ) @@ -66,16 +65,29 @@ public boolean isStaleFor( AccessMode mode ) mode == AccessMode.WRITE && writers.size() == 0; } + private Set servers() + { + Set servers = new HashSet<>(); + servers.addAll( readers.servers() ); + servers.addAll( writers.servers() ); + servers.addAll( routers.servers() ); + return servers; + } + @Override public synchronized RoutingTableChange update( ClusterComposition cluster ) { expirationTimeout = cluster.expirationTimestamp(); - // todo: what if server is added as reader and removed as writer? we should not treat it as removed - Set added = new HashSet<>(); - Set removed = new HashSet<>(); - readers.update( cluster.readers(), added, removed ); - writers.update( cluster.writers(), added, removed ); - routers.update( cluster.routers(), added, removed ); + Set pre = servers(); + readers.update( cluster.readers() ); + writers.update( cluster.writers() ); + routers.update( cluster.routers() ); + Set cur = servers(); + + Set added = new HashSet<>( cur ); + Set removed = new HashSet<>( pre ); + added.removeAll( pre ); + removed.removeAll( cur ); return new RoutingTableChange( added, removed ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java index ed0a6ac98e..367cdbb55f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java @@ -19,7 +19,7 @@ package org.neo4j.driver.internal.cluster; import java.util.Arrays; -import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -46,6 +46,11 @@ public BoltServerAddress next() return addresses[next( addresses.length )]; } + public Set servers() + { + return new HashSet<>( Arrays.asList( addresses ) ); + } + int next( int divisor ) { int index = offset.getAndIncrement(); @@ -56,58 +61,9 @@ int next( int divisor ) return index % divisor; } - public synchronized void update( Set addresses, Set added, - Set removed ) + public synchronized void update( Set addresses ) { - BoltServerAddress[] prev = this.addresses; - if ( addresses.isEmpty() ) - { - this.addresses = NONE; - Collections.addAll( removed, prev ); - return; - } - if ( prev.length == 0 ) - { - this.addresses = addresses.toArray( NONE ); - Collections.addAll( added, this.addresses ); - return; - } - BoltServerAddress[] copy = null; - if ( addresses.size() != prev.length ) - { - copy = new BoltServerAddress[addresses.size()]; - } - int j = 0; - for ( int i = 0; i < prev.length; i++ ) - { - if ( addresses.remove( prev[i] ) ) - { - if ( copy != null ) - { - copy[j++] = prev[i]; - } - } - else - { - removed.add( prev[i] ); - if ( copy == null ) - { - copy = new BoltServerAddress[prev.length]; - System.arraycopy( prev, 0, copy, 0, i ); - j = i; - } - } - } - if ( copy == null ) - { - return; - } - for ( BoltServerAddress address : addresses ) - { - copy[j++] = address; - added.add( address ); - } - this.addresses = copy; + this.addresses = addresses.toArray( NONE ); } public synchronized void remove( BoltServerAddress address ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java index 3a883b89bd..099af4159b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java @@ -215,4 +215,20 @@ public void shouldReturnCorrectChangeWhenUpdated() assertEquals( 2, change.removed().size() ); assertThat( change.removed(), containsInAnyOrder( A, D ) ); } + + @Test + public void shouldNotRemoveServerIfPreWriterNowReader() + { + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() ); + routingTable.update( createClusterComposition( singletonList( A ), singletonList( B ), singletonList( C ) ) ); + + ClusterComposition newComposition = + createClusterComposition( singletonList( D ), singletonList( E ), singletonList( B ) ); + RoutingTableChange change = routingTable.update( newComposition ); + + assertEquals( 2, change.added().size() ); + assertThat( change.added(), containsInAnyOrder( D, E ) ); + assertEquals( 2, change.removed().size() ); + assertThat( change.removed(), containsInAnyOrder( A, C ) ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index 8ed9e6fced..fb117a071b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -341,8 +341,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( RoutingTableChange.EMPTY ); RoundRobinAddressSet addresses = new RoundRobinAddressSet(); - addresses.update( new HashSet<>( singletonList( LOCAL_DEFAULT ) ), new HashSet(), - new HashSet() ); + addresses.update( new HashSet<>( singletonList( LOCAL_DEFAULT ) )); when( routingTable.readers() ).thenReturn( addresses ); when( routingTable.writers() ).thenReturn( addresses ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java index 85a8b74566..e89fd7e37f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java @@ -21,7 +21,6 @@ import org.junit.Test; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -29,7 +28,6 @@ import org.neo4j.driver.internal.net.BoltServerAddress; import static java.util.Arrays.asList; -import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; @@ -57,7 +55,7 @@ public void shouldReturnRoundRobin() throws Exception new BoltServerAddress( "two" ), new BoltServerAddress( "tre" ) ) ); - set.update( addresses, new HashSet(), new HashSet() ); + set.update( addresses ); // when BoltServerAddress a = set.next(); @@ -85,7 +83,7 @@ public void shouldPreserveOrderWhenAdding() throws Exception new BoltServerAddress( "two" ), new BoltServerAddress( "tre" ) ) ); RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet(), new HashSet() ); + set.update( servers ); List order = new ArrayList<>(); for ( int i = 3 * 4 + 1; i-- > 0; ) @@ -100,7 +98,7 @@ public void shouldPreserveOrderWhenAdding() throws Exception // when servers.add( new BoltServerAddress( "fyr" ) ); - set.update( servers, new HashSet(), new HashSet() ); + set.update( servers ); // then assertEquals( order.get( 1 ), set.next() ); @@ -126,7 +124,7 @@ public void shouldPreserveOrderWhenRemoving() throws Exception new BoltServerAddress( "two" ), new BoltServerAddress( "tre" ) ) ); RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet(), new HashSet() ); + set.update( servers ); List order = new ArrayList<>(); for ( int i = 3 * 2 + 1; i-- > 0; ) @@ -158,7 +156,7 @@ public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception new BoltServerAddress( "two" ), new BoltServerAddress( "tre" ) ) ); RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet(), new HashSet() ); + set.update( servers ); List order = new ArrayList<>(); for ( int i = 3 * 2 + 1; i-- > 0; ) @@ -173,7 +171,7 @@ public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception // when servers.remove( order.get( 1 ) ); - set.update( servers, new HashSet(), new HashSet() ); + set.update( servers ); // then assertEquals( order.get( 2 ), set.next() ); @@ -182,106 +180,7 @@ public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception assertEquals( order.get( 0 ), set.next() ); } - @Test - public void shouldRecordRemovedAddressesWhenUpdating() throws Exception - { - // given - RoundRobinAddressSet set = new RoundRobinAddressSet(); - Set addresses = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - set.update( addresses, new HashSet(), new HashSet() ); - - // when - Set removed = new HashSet<>(); - Set newAddresses = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "fyr" ) ) ); - set.update( newAddresses, new HashSet(), removed ); - - // then - assertEquals( singleton( new BoltServerAddress( "tre" ) ), removed ); - } - @Test - public void shouldRecordRemovedAddressesWhenUpdateIsEmpty() - { - RoundRobinAddressSet set = new RoundRobinAddressSet(); - Set addresses = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ) ) ); - set.update( addresses, new HashSet(), new HashSet() ); - - Set update = Collections.emptySet(); - Set removed = new HashSet<>(); - set.update( update, new HashSet(), removed ); - - assertEquals( addresses, removed ); - } - - @Test - public void shouldRecordAddedAddressesWhenUpdatingAnEmptySet() - { - RoundRobinAddressSet set = new RoundRobinAddressSet(); - - Set added1 = new HashSet<>(); - Set addresses1 = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - set.update( addresses1, added1, new HashSet() ); - - assertEquals( addresses1, added1 ); - } - - @Test - public void shouldRecordAddedAddressesWhenUpdating() - { - RoundRobinAddressSet set = new RoundRobinAddressSet(); - - Set addresses1 = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - set.update( addresses1, new HashSet(), new HashSet() ); - - Set added = new HashSet<>(); - Set newAddresses = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "tre" ), - new BoltServerAddress( "four" ) ) ); - set.update( newAddresses, added, new HashSet() ); - - assertEquals( singleton( new BoltServerAddress( "four" ) ), added ); - } - - @Test - public void shouldRecordBothAddedAndRemovedAddressesWhenUpdating() - { - RoundRobinAddressSet set = new RoundRobinAddressSet(); - - Set addresses1 = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "three" ) ) ); - set.update( addresses1, new HashSet(), new HashSet() ); - - Set newAddresses = new HashSet<>( asList( - new BoltServerAddress( "two" ), - new BoltServerAddress( "four" ), - new BoltServerAddress( "five" ) ) ); - - Set added = new HashSet<>(); - Set removed = new HashSet<>(); - set.update( newAddresses, added, removed ); - - assertEquals( - new HashSet<>( asList( new BoltServerAddress( "four" ), new BoltServerAddress( "five" ) ) ), added ); - assertEquals( - new HashSet<>( asList( new BoltServerAddress( "one" ), new BoltServerAddress( "three" ) ) ), removed ); - } @Test public void shouldPreserveOrderEvenWhenIntegerOverflows() throws Exception