2828import java .lang .reflect .Method ;
2929import java .net .URI ;
3030import java .util .ArrayList ;
31+ import java .util .HashMap ;
3132import java .util .HashSet ;
3233import java .util .List ;
34+ import java .util .Map ;
3335import java .util .Set ;
3436import java .util .concurrent .Callable ;
3537import java .util .concurrent .ConcurrentHashMap ;
38+ import java .util .concurrent .ConcurrentMap ;
3639import java .util .concurrent .ExecutorService ;
3740import java .util .concurrent .Executors ;
3841import java .util .concurrent .Future ;
4245import java .util .concurrent .atomic .AtomicLong ;
4346
4447import org .neo4j .driver .internal .logging .DevNullLogger ;
48+ import org .neo4j .driver .internal .util .ServerVersion ;
4549import org .neo4j .driver .v1 .AccessMode ;
4650import org .neo4j .driver .v1 .AuthToken ;
4751import org .neo4j .driver .v1 .Config ;
5761import org .neo4j .driver .v1 .exceptions .SecurityException ;
5862import org .neo4j .driver .v1 .types .Node ;
5963import org .neo4j .driver .v1 .util .DaemonThreadFactory ;
64+ import org .neo4j .driver .v1 .util .cc .ClusterMemberRole ;
6065import org .neo4j .driver .v1 .util .cc .LocalOrRemoteClusterRule ;
6166
6267import static java .util .Collections .newSetFromMap ;
68+ import static org .hamcrest .Matchers .both ;
6369import static org .hamcrest .Matchers .containsString ;
6470import static org .hamcrest .Matchers .equalTo ;
71+ import static org .hamcrest .Matchers .greaterThan ;
72+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
6573import static org .hamcrest .Matchers .instanceOf ;
6674import static org .hamcrest .Matchers .lessThanOrEqualTo ;
6775import static org .junit .Assert .assertEquals ;
7179import static org .junit .Assert .fail ;
7280import static org .neo4j .driver .internal .util .Iterables .single ;
7381import static org .neo4j .driver .v1 .AuthTokens .basic ;
82+ import static org .neo4j .driver .v1 .util .cc .ClusterMember .SIMPLE_SCHEME ;
7483
7584public class CausalClusteringStressIT
7685{
@@ -138,6 +147,7 @@ public void basicStressTest() throws Throwable
138147 assertNoFileDescriptorLeak ( resourcesInfo .openFileDescriptorCount );
139148 assertNoLoggersLeak ( resourcesInfo .acquiredLoggerNames );
140149 assertExpectedNumberOfNodesCreated ( context .getCreatedNodesCount () );
150+ assertGoodReadQueryDistribution ( context .getReadQueriesByServer () );
141151 }
142152
143153 private List <Future <?>> launchWorkerThreads ( Context context )
@@ -235,6 +245,96 @@ private void assertExpectedNumberOfNodesCreated( long expectedCount )
235245 }
236246 }
237247
248+ private void assertGoodReadQueryDistribution ( Map <String ,Long > readQueriesByServer )
249+ {
250+ ClusterAddresses clusterAddresses = fetchClusterAddresses ( driver );
251+
252+ // before 3.2.0 only read replicas serve reads
253+ boolean readsOnFollowersEnabled = ServerVersion .version ( driver ).greaterThanOrEqual ( ServerVersion .v3_2_0 );
254+
255+ if ( readsOnFollowersEnabled )
256+ {
257+ // expect all followers to serve more than zero read queries
258+ assertAllAddressesServedReadQueries ( "Follower" , clusterAddresses .followers , readQueriesByServer );
259+ }
260+
261+ // expect all read replicas to serve more than zero read queries
262+ assertAllAddressesServedReadQueries ( "Read replica" , clusterAddresses .readReplicas , readQueriesByServer );
263+
264+ if ( readsOnFollowersEnabled )
265+ {
266+ // expect all followers to serve same order of magnitude read queries
267+ assertAllAddressesServedSimilarAmountOfReadQueries ( "Followers" , clusterAddresses .followers ,
268+ readQueriesByServer , clusterAddresses );
269+ }
270+
271+ // expect all read replicas to serve same order of magnitude read queries
272+ assertAllAddressesServedSimilarAmountOfReadQueries ( "Read replicas" , clusterAddresses .readReplicas ,
273+ readQueriesByServer , clusterAddresses );
274+ }
275+
276+ private static ClusterAddresses fetchClusterAddresses ( Driver driver )
277+ {
278+ Set <String > followers = new HashSet <>();
279+ Set <String > readReplicas = new HashSet <>();
280+
281+ try ( Session session = driver .session () )
282+ {
283+ List <Record > records = session .run ( "CALL dbms.cluster.overview()" ).list ();
284+ for ( Record record : records )
285+ {
286+ List <Object > addresses = record .get ( "addresses" ).asList ();
287+ String boltAddress = ((String ) addresses .get ( 0 )).replace ( SIMPLE_SCHEME , "" );
288+
289+ ClusterMemberRole role = ClusterMemberRole .valueOf ( record .get ( "role" ).asString () );
290+ if ( role == ClusterMemberRole .FOLLOWER )
291+ {
292+ followers .add ( boltAddress );
293+ }
294+ else if ( role == ClusterMemberRole .READ_REPLICA )
295+ {
296+ readReplicas .add ( boltAddress );
297+ }
298+ }
299+ }
300+
301+ return new ClusterAddresses ( followers , readReplicas );
302+ }
303+
304+ private static void assertAllAddressesServedReadQueries ( String addressType , Set <String > addresses ,
305+ Map <String ,Long > readQueriesByServer )
306+ {
307+ for ( String address : addresses )
308+ {
309+ Long queries = readQueriesByServer .get ( address );
310+ assertThat ( addressType + " did not serve any read queries" , queries , greaterThan ( 0L ) );
311+ }
312+ }
313+
314+ private static void assertAllAddressesServedSimilarAmountOfReadQueries ( String addressesType , Set <String > addresses ,
315+ Map <String ,Long > readQueriesByServer , ClusterAddresses allAddresses )
316+ {
317+ long expectedOrderOfMagnitude = -1 ;
318+ for ( String address : addresses )
319+ {
320+ long queries = readQueriesByServer .get ( address );
321+ long orderOfMagnitude = orderOfMagnitude ( queries );
322+ if ( expectedOrderOfMagnitude == -1 )
323+ {
324+ expectedOrderOfMagnitude = orderOfMagnitude ;
325+ }
326+ else
327+ {
328+ assertThat ( addressesType + " are expected to serve similar amount of queries. " +
329+ "Addresses: " + allAddresses + ", " +
330+ "read queries served: " + readQueriesByServer ,
331+ orderOfMagnitude ,
332+ both ( greaterThanOrEqualTo ( expectedOrderOfMagnitude - 1 ) )
333+ .and ( lessThanOrEqualTo ( expectedOrderOfMagnitude + 1 ) ) );
334+ }
335+ }
336+ }
337+
238338 private static long getOpenFileDescriptorCount ()
239339 {
240340 try
@@ -260,11 +360,23 @@ private static Throwable withSuppressed( Throwable firstError, Throwable newErro
260360 return firstError ;
261361 }
262362
363+ private static long orderOfMagnitude ( long number )
364+ {
365+ long result = 1 ;
366+ while ( number >= 10 )
367+ {
368+ number /= 10 ;
369+ result ++;
370+ }
371+ return result ;
372+ }
373+
263374 private static class Context
264375 {
265376 volatile boolean stopped ;
266377 volatile String bookmark ;
267378 final AtomicLong createdNodesCount = new AtomicLong ();
379+ final ConcurrentMap <String ,AtomicLong > readQueriesByServer = new ConcurrentHashMap <>();
268380
269381 boolean isStopped ()
270382 {
@@ -295,6 +407,33 @@ long getCreatedNodesCount()
295407 {
296408 return createdNodesCount .get ();
297409 }
410+
411+ void readCompleted ( StatementResult result )
412+ {
413+ String serverAddress = result .summary ().server ().address ();
414+
415+ AtomicLong count = readQueriesByServer .get ( serverAddress );
416+ if ( count == null )
417+ {
418+ count = new AtomicLong ();
419+ AtomicLong existingCounter = readQueriesByServer .putIfAbsent ( serverAddress , count );
420+ if ( existingCounter != null )
421+ {
422+ count = existingCounter ;
423+ }
424+ }
425+ count .incrementAndGet ();
426+ }
427+
428+ Map <String ,Long > getReadQueriesByServer ()
429+ {
430+ Map <String ,Long > result = new HashMap <>();
431+ for ( Map .Entry <String ,AtomicLong > entry : readQueriesByServer .entrySet () )
432+ {
433+ result .put ( entry .getKey (), entry .getValue ().get () );
434+ }
435+ return result ;
436+ }
298437 }
299438
300439 private interface Command
@@ -343,6 +482,8 @@ public void execute( Context context )
343482 Node node = record .get ( 0 ).asNode ();
344483 assertNotNull ( node );
345484 }
485+
486+ context .readCompleted ( result );
346487 }
347488 }
348489 }
@@ -368,6 +509,8 @@ public void execute( Context context )
368509 Node node = record .get ( 0 ).asNode ();
369510 assertNotNull ( node );
370511 }
512+
513+ context .readCompleted ( result );
371514 tx .success ();
372515 }
373516 }
@@ -529,9 +672,30 @@ public Logger getLog( String name )
529672 return DevNullLogger .DEV_NULL_LOGGER ;
530673 }
531674
532- public Set <String > getAcquiredLoggerNames ()
675+ Set <String > getAcquiredLoggerNames ()
533676 {
534677 return new HashSet <>( acquiredLoggerNames );
535678 }
536679 }
680+
681+ private static class ClusterAddresses
682+ {
683+ final Set <String > followers ;
684+ final Set <String > readReplicas ;
685+
686+ ClusterAddresses ( Set <String > followers , Set <String > readReplicas )
687+ {
688+ this .followers = followers ;
689+ this .readReplicas = readReplicas ;
690+ }
691+
692+ @ Override
693+ public String toString ()
694+ {
695+ return "ClusterAddresses{" +
696+ "followers=" + followers +
697+ ", readReplicas=" + readReplicas +
698+ '}' ;
699+ }
700+ }
537701}
0 commit comments