Skip to content
21 changes: 19 additions & 2 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import java.net.URI;
import java.security.GeneralSecurityException;

import org.neo4j.driver.internal.cluster.LoadBalancer;
import org.neo4j.driver.internal.cluster.RoutingContext;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy;
import org.neo4j.driver.internal.cluster.loadbalancing.RoundRobinLoadBalancingStrategy;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.net.SocketConnector;
import org.neo4j.driver.internal.net.pooling.PoolSettings;
Expand Down Expand Up @@ -146,7 +149,21 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
RoutingSettings routingSettings )
{
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging() );
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging(),
createLoadBalancingStrategy( config, connectionPool ) );
}

private LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool )
{
switch ( config.loadBalancingStrategy() )
{
case ROUND_ROBIN:
return new RoundRobinLoadBalancingStrategy( config.logging() );
case LEAST_CONNECTED:
return new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() );
default:
throw new IllegalArgumentException( "Unknown load balancing strategy: " + config.loadBalancingStrategy() );
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,23 @@

import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.driver.internal.net.BoltServerAddress;

public class RoundRobinAddressSet
public class AddressSet
{
private static final BoltServerAddress[] NONE = {};
private final AtomicInteger offset = new AtomicInteger();
private volatile BoltServerAddress[] addresses = NONE;

public int size()
{
return addresses.length;
}
private volatile BoltServerAddress[] addresses = NONE;

public BoltServerAddress next()
public BoltServerAddress[] toArray()
{
BoltServerAddress[] addresses = this.addresses;
if ( addresses.length == 0 )
{
return null;
}
return addresses[next( addresses.length )];
return addresses;
}

int next( int divisor )
public int size()
{
int index = offset.getAndIncrement();
for ( ; index == Integer.MAX_VALUE; index = offset.getAndIncrement() )
{
offset.compareAndSet( Integer.MIN_VALUE, index % divisor );
}
return index % divisor;
return addresses.length;
}

public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
Expand Down Expand Up @@ -132,12 +116,6 @@ public synchronized void remove( BoltServerAddress address )
@Override
public String toString()
{
return "RoundRobinAddressSet=" + Arrays.toString( addresses );
}

/** breaking encapsulation in order to perform white-box testing of boundary case */
void setOffset( int target )
{
offset.set( target );
return "AddressSet=" + Arrays.toString( addresses );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.util.Function;

final class ClusterComposition
public final class ClusterComposition
{
private static final long MAX_TTL = Long.MAX_VALUE / 1000L;
private static final Function<Value,BoltServerAddress> OF_BoltServerAddress =
Expand All @@ -53,7 +53,7 @@ private ClusterComposition( long expirationTimestamp )
}

/** For testing */
ClusterComposition(
public ClusterComposition(
long expirationTimestamp,
Set<BoltServerAddress> readers,
Set<BoltServerAddress> writers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class ClusterRoutingTable implements RoutingTable

private final Clock clock;
private volatile long expirationTimeout;
private final RoundRobinAddressSet readers;
private final RoundRobinAddressSet writers;
private final RoundRobinAddressSet routers;
private final AddressSet readers;
private final AddressSet writers;
private final AddressSet routers;

public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
{
Expand All @@ -51,9 +51,9 @@ private ClusterRoutingTable( Clock clock )
this.clock = clock;
this.expirationTimeout = clock.millis() - 1;

this.readers = new RoundRobinAddressSet();
this.writers = new RoundRobinAddressSet();
this.routers = new RoundRobinAddressSet();
this.readers = new AddressSet();
this.writers = new AddressSet();
this.routers = new AddressSet();
}

@Override
Expand Down Expand Up @@ -85,27 +85,21 @@ public synchronized void forget( BoltServerAddress address )
}

@Override
public RoundRobinAddressSet readers()
public AddressSet readers()
{
return readers;
}

@Override
public RoundRobinAddressSet writers()
public AddressSet writers()
{
return writers;
}

@Override
public BoltServerAddress nextRouter()
public AddressSet routers()
{
return routers.next();
}

@Override
public int routerSize()
{
return routers.size();
return routers;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,10 @@ private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable
private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections,
Set<BoltServerAddress> seenServers )
{
int size = routingTable.routerSize();
for ( int i = 0; i < size; i++ )
{
BoltServerAddress address = routingTable.nextRouter();
if ( address == null )
{
break;
}
BoltServerAddress[] addresses = routingTable.routers().toArray();

for ( BoltServerAddress address : addresses )
{
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
if ( composition != null )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@

import static java.lang.String.format;

class RoutingPooledConnection implements PooledConnection
public class RoutingPooledConnection implements PooledConnection
{
private final PooledConnection delegate;
private final RoutingErrorHandler errorHandler;
private final AccessMode accessMode;

RoutingPooledConnection( PooledConnection delegate, RoutingErrorHandler errorHandler, AccessMode accessMode )
public RoutingPooledConnection( PooledConnection delegate, RoutingErrorHandler errorHandler, AccessMode accessMode )
{
this.delegate = delegate;
this.errorHandler = errorHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ public interface RoutingTable

void forget( BoltServerAddress address );

RoundRobinAddressSet readers();
AddressSet readers();

RoundRobinAddressSet writers();
AddressSet writers();

BoltServerAddress nextRouter();

int routerSize();
AddressSet routers();

void removeWriter( BoltServerAddress toRemove );
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster.loadbalancing;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;

/**
* Load balancing strategy that finds server with least amount of active (checked out of the pool) connections from
* given readers or writers. It finds a start index for iteration in a round-robin fashion. This is done to prevent
* choosing same first address over and over when all addresses have same amount of active connections.
*/
public class LeastConnectedLoadBalancingStrategy implements LoadBalancingStrategy
{
private static final String LOGGER_NAME = LeastConnectedLoadBalancingStrategy.class.getSimpleName();

private final RoundRobinArrayIndex readersIndex = new RoundRobinArrayIndex();
private final RoundRobinArrayIndex writersIndex = new RoundRobinArrayIndex();

private final ConnectionPool connectionPool;
private final Logger log;

public LeastConnectedLoadBalancingStrategy( ConnectionPool connectionPool, Logging logging )
{
this.connectionPool = connectionPool;
this.log = logging.getLog( LOGGER_NAME );
}

@Override
public BoltServerAddress selectReader( BoltServerAddress[] knownReaders )
{
return select( knownReaders, readersIndex, "reader" );
}

@Override
public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters )
{
return select( knownWriters, writersIndex, "writer" );
}

private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex addressesIndex,
String addressType )
{
int size = addresses.length;
if ( size == 0 )
{
log.trace( "Unable to select %s, no known addresses given", addressType );
return null;
}

// choose start index for iteration in round-rodin fashion
int startIndex = addressesIndex.next( size );
int index = startIndex;

BoltServerAddress leastConnectedAddress = null;
int leastActiveConnections = Integer.MAX_VALUE;

// iterate over the array to find least connected address
do
{
BoltServerAddress address = addresses[index];
int activeConnections = connectionPool.activeConnections( address );

if ( activeConnections < leastActiveConnections )
{
leastConnectedAddress = address;
leastActiveConnections = activeConnections;
}

// loop over to the start of the array when end is reached
if ( index == size - 1 )
{
index = 0;
}
else
{
index++;
}
}
while ( index != startIndex );

log.trace( "Selected %s with address: '%s' and active connections: %s",
addressType, leastConnectedAddress, leastActiveConnections );

return leastConnectedAddress;
}
}
Loading