1616 * See the License for the specific language governing permissions and
1717 * limitations under the License.
1818 */
19-
2019package org .neo4j .driver .internal ;
2120
22- import java .util .LinkedList ;
2321import java .util .List ;
2422
2523import org .neo4j .driver .internal .net .BoltServerAddress ;
2624import org .neo4j .driver .internal .net .pooling .PoolSettings ;
2725import org .neo4j .driver .internal .net .pooling .SocketConnectionPool ;
2826import org .neo4j .driver .internal .security .SecurityPlan ;
2927import org .neo4j .driver .internal .spi .Connection ;
30- import org .neo4j .driver .internal .spi .ConnectionPool ;
3128import org .neo4j .driver .internal .util .Consumer ;
3229import org .neo4j .driver .internal .util .Supplier ;
3330import org .neo4j .driver .v1 .Logging ;
3633import org .neo4j .driver .v1 .SessionMode ;
3734import org .neo4j .driver .v1 .StatementResult ;
3835import org .neo4j .driver .v1 .exceptions .ClientException ;
39- import org .neo4j .driver .v1 .exceptions .ClusterUnavailableException ;
4036import org .neo4j .driver .v1 .exceptions .ConnectionFailureException ;
37+ import org .neo4j .driver .v1 .exceptions .ServiceUnavailableException ;
4138
4239import static java .lang .String .format ;
4340
4441public class ClusterDriver extends BaseDriver
4542{
4643 private static final String DISCOVER_MEMBERS = "dbms.cluster.discoverMembers" ;
4744 private static final String ACQUIRE_ENDPOINTS = "dbms.cluster.acquireEndpoints" ;
48- private static final int MINIMUM_NUMBER_OF_SERVERS = 3 ;
4945
50- private final ConnectionPool connections ;
46+ private final Endpoints endpoints = new Endpoints ();
47+ private final ClusterSettings clusterSettings ;
48+ private boolean discoverable = true ;
5149
52- public ClusterDriver ( BoltServerAddress seedAddress , ConnectionSettings connectionSettings , SecurityPlan securityPlan ,
53- PoolSettings poolSettings , Logging logging )
50+ public ClusterDriver ( BoltServerAddress seedAddress , ConnectionSettings connectionSettings ,
51+ ClusterSettings clusterSettings ,
52+ SecurityPlan securityPlan ,
53+ PoolSettings poolSettings , Logging logging )
5454 {
55- super ( seedAddress , securityPlan , logging );
56- this .connections = new SocketConnectionPool ( connectionSettings , securityPlan , poolSettings , logging ) ;
55+ super ( new SocketConnectionPool ( connectionSettings , securityPlan , poolSettings , logging ), seedAddress , securityPlan , logging );
56+ this .clusterSettings = clusterSettings ;
5757 discover ();
5858 }
5959
60- void discover ()
60+ synchronized void discover ()
6161 {
62- final List <BoltServerAddress > newServers = new LinkedList <>( );
62+ if (!discoverable )
63+ {
64+ return ;
65+ }
66+
6367 try
6468 {
6569 boolean success = false ;
66- while ( !servers .isEmpty () && !success )
70+ while ( !connections .isEmpty () && !success )
6771 {
6872 success = call ( DISCOVER_MEMBERS , new Consumer <Record >()
6973 {
7074 @ Override
7175 public void accept ( Record record )
7276 {
73- newServers .add ( new BoltServerAddress ( record .get ( "address" ).asString () ) );
77+ connections .add (new BoltServerAddress ( record .get ( "address" ).asString () ));
7478 }
7579 } );
76-
7780 }
78- if ( success )
81+ if ( ! success )
7982 {
80- this .servers .clear ();
81- this .servers .addAll ( newServers );
82- log .debug ( "~~ [MEMBERS] -> %s" , newServers );
83- }
84- else
85- {
86- throw new ClusterUnavailableException ( "Run out of servers" );
83+ throw new ServiceUnavailableException ( "Run out of servers" );
8784 }
8885 }
8986 catch ( ClientException ex )
9087 {
9188 if ( ex .code ().equals ( "Neo.ClientError.Procedure.ProcedureNotFound" ) )
9289 {
93- throw new ClientException ( "Discovery failed: could not find procedure %s" , DISCOVER_MEMBERS );
90+ //no procedure there, not much to do, stick with what we've got
91+ //this may happen because server is running in standalone mode
92+ log .warn ( "Could not find procedure %s" , DISCOVER_MEMBERS );
93+ discoverable = false ;
9494 }
9595 else
9696 {
@@ -99,13 +99,15 @@ public void accept( Record record )
9999 }
100100 }
101101
102+ //must be called from a synchronized method
102103 private boolean call ( String procedureName , Consumer <Record > recorder )
103104 {
105+ Connection acquire = null ;
106+ Session session = null ;
107+ try {
108+ acquire = connections .acquire ();
109+ session = new NetworkSession ( acquire , log );
104110
105- BoltServerAddress address = randomServer ();
106- Connection acquire = connections .acquire ( address );
107- try ( Session session = new NetworkSession ( acquire , log ) )
108- {
109111 StatementResult records = session .run ( format ( "CALL %s" , procedureName ) );
110112 while ( records .hasNext () )
111113 {
@@ -114,65 +116,162 @@ private boolean call( String procedureName, Consumer<Record> recorder )
114116 }
115117 catch ( ConnectionFailureException e )
116118 {
117- forget (address );
119+ if (acquire != null )
120+ {
121+ forget ( acquire .address () );
122+ }
118123 return false ;
119124 }
125+ finally
126+ {
127+ if (acquire != null )
128+ {
129+ acquire .close ();
130+ }
131+ if (session != null )
132+ {
133+ session .close ();
134+ }
135+ }
120136 return true ;
121137 }
122138
123- private void forget (BoltServerAddress address )
139+ //must be called from a synchronized method
140+ private void callWithRetry (String procedureName , Consumer <Record > recorder )
124141 {
125- servers .remove ( address );
126- connections .purge (address );
142+ while ( !connections .isEmpty () )
143+ {
144+ Connection acquire = null ;
145+ Session session = null ;
146+ try {
147+ acquire = connections .acquire ();
148+ session = new NetworkSession ( acquire , log );
149+ List <Record > list = session .run ( format ( "CALL %s" , procedureName ) ).list ();
150+ for ( Record record : list )
151+ {
152+ recorder .accept ( record );
153+ }
154+ //we found results give up
155+ return ;
156+ }
157+ catch ( ConnectionFailureException e )
158+ {
159+ if (acquire != null )
160+ {
161+ forget ( acquire .address () );
162+ }
163+ }
164+ finally
165+ {
166+ if (acquire != null )
167+ {
168+ acquire .close ();
169+ }
170+ if (session != null )
171+ {
172+ session .close ();
173+ }
174+ }
175+ }
176+
177+ throw new ServiceUnavailableException ( "Failed to communicate with any of the cluster members" );
178+ }
179+
180+ private synchronized void forget ( BoltServerAddress address )
181+ {
182+ connections .purge ( address );
127183 }
128184
129- //TODO this could return a WRITE session but that may lead to users using the LEADER too much
130- //a `ClientException` may be what we want
131185 @ Override
132186 public Session session ()
133187 {
134- throw new UnsupportedOperationException ( );
188+ return session ( SessionMode . WRITE );
135189 }
136190
137191 @ Override
138192 public Session session ( final SessionMode mode )
139193 {
140- return new ClusteredSession ( new Supplier < Connection >( )
194+ switch ( mode )
141195 {
142- @ Override
143- public Connection get ()
196+ case READ :
197+ return new ReadNetworkSession ( new Supplier < Connection > ()
144198 {
145- return acquireConnection ( mode );
146- }
147- }, log );
199+ @ Override
200+ public Connection get ()
201+ {
202+ return acquireConnection ( mode );
203+ }
204+ }, new Consumer <Connection >()
205+ {
206+ @ Override
207+ public void accept ( Connection connection )
208+ {
209+ forget ( connection .address () );
210+ }
211+ }, clusterSettings , log );
212+ case WRITE :
213+ throw new UnsupportedOperationException ();
214+ default :
215+ throw new UnsupportedOperationException ();
216+ }
148217 }
149218
150- private Connection acquireConnection ( SessionMode mode )
219+ private synchronized Connection acquireConnection ( SessionMode mode )
151220 {
221+ if (!discoverable )
222+ {
223+ return connections .acquire ();
224+ }
225+
152226 //if we are short on servers, find new ones
153- if ( servers . size () < MINIMUM_NUMBER_OF_SERVERS )
227+ if ( connections . addressCount () < clusterSettings . minimumNumberOfServers () )
154228 {
155229 discover ();
156230 }
157231
158- final BoltServerAddress [] addresses = new BoltServerAddress [ 2 ] ;
159- call ( ACQUIRE_ENDPOINTS , new Consumer < Record >()
232+ endpoints . clear () ;
233+ try
160234 {
161- @ Override
162- public void accept ( Record record )
235+ callWithRetry ( ACQUIRE_ENDPOINTS , new Consumer <Record >()
163236 {
164- addresses [0 ] = new BoltServerAddress ( record .get ( "READ" ).asString () );
165- addresses [1 ] = new BoltServerAddress ( record .get ( "WRITE" ).asString () );
237+ @ Override
238+ public void accept ( Record record )
239+ {
240+ String serverMode = record .get ( "mode" ).asString ();
241+ if ( serverMode .equals ( "READ" ) )
242+ {
243+ endpoints .readServer = new BoltServerAddress ( record .get ( "address" ).asString () );
244+ }
245+ else if ( serverMode .equals ( "WRITE" ) )
246+ {
247+ endpoints .writeServer = new BoltServerAddress ( record .get ( "address" ).asString () );
248+ }
249+ }
250+ } );
251+ }
252+ catch (ClientException e )
253+ {
254+ if ( e .code ().equals ( "Neo.ClientError.Procedure.ProcedureNotFound" ) )
255+ {
256+ log .warn ( "Could not find procedure %s" , ACQUIRE_ENDPOINTS );
257+ discoverable = false ;
258+ return connections .acquire ();
166259 }
167- } );
260+ throw e ;
261+ }
262+
263+ if ( !endpoints .valid () )
264+ {
265+ throw new ServiceUnavailableException ("Could not establish any endpoints for the call" );
266+ }
168267
169268
170269 switch ( mode )
171270 {
172271 case READ :
173- return connections .acquire ( addresses [ 0 ] );
272+ return connections .acquire ( endpoints . readServer );
174273 case WRITE :
175- return connections .acquire ( addresses [ 0 ] );
274+ return connections .acquire ( endpoints . writeServer );
176275 default :
177276 throw new ClientException ( mode + " is not supported for creating new sessions" );
178277 }
@@ -191,4 +290,21 @@ public void close()
191290 }
192291 }
193292
293+ private static class Endpoints
294+ {
295+ BoltServerAddress readServer ;
296+ BoltServerAddress writeServer ;
297+
298+ public boolean valid ()
299+ {
300+ return readServer != null && writeServer != null ;
301+ }
302+
303+ public void clear ()
304+ {
305+ readServer = null ;
306+ writeServer = null ;
307+ }
308+ }
309+
194310}
0 commit comments