Skip to content

Commit e30e01e

Browse files
committed
Remove retry handling for read sessions
The plan of automatically handle retries on read sessions didn't really pan out since we don't really control when data is transported over the network and errors are noticed. Instead we treat reads and writes in the same way, i.e. throwing a `SessionExpiredException` on all connection failures.
1 parent e47dcf6 commit e30e01e

15 files changed

+370
-154
lines changed

driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.neo4j.driver.internal.security.SecurityPlan;
2727
import org.neo4j.driver.internal.spi.Connection;
2828
import org.neo4j.driver.internal.util.Consumer;
29-
import org.neo4j.driver.internal.util.Supplier;
3029
import org.neo4j.driver.v1.Logging;
3130
import org.neo4j.driver.v1.Record;
3231
import org.neo4j.driver.v1.Session;
@@ -180,6 +179,10 @@ private void callWithRetry(String procedureName, Consumer<Record> recorder )
180179
private synchronized void forget( BoltServerAddress address )
181180
{
182181
connections.purge( address );
182+
if ( endpoints.contains( address ) )
183+
{
184+
endpoints.clear();
185+
}
183186
}
184187

185188
@Override
@@ -191,32 +194,17 @@ public Session session()
191194
@Override
192195
public Session session( final SessionMode mode )
193196
{
194-
switch ( mode )
197+
return new ClusteredNetworkSession( acquireConnection( mode ), clusterSettings, new Consumer<BoltServerAddress>()
195198
{
196-
case READ:
197-
return new ReadNetworkSession( new Supplier<Connection>()
199+
@Override
200+
public void accept( BoltServerAddress address )
198201
{
199-
@Override
200-
public Connection get()
201-
{
202-
return acquireConnection( mode );
203-
}
204-
}, new Consumer<Connection>()
205-
{
206-
@Override
207-
public void accept( Connection connection )
208-
{
209-
forget( connection.address() );
210-
}
211-
}, clusterSettings, log );
212-
case WRITE:
213-
return new WriteNetworkSession( acquireConnection( mode ), clusterSettings, log );
214-
default:
215-
throw new UnsupportedOperationException();
216-
}
202+
forget( address );
203+
}
204+
}, log );
217205
}
218206

219-
private synchronized Connection acquireConnection( SessionMode mode )
207+
private Connection acquireConnection( SessionMode mode )
220208
{
221209
if (!discoverable)
222210
{
@@ -228,7 +216,24 @@ private synchronized Connection acquireConnection( SessionMode mode )
228216
{
229217
discover();
230218
}
219+
if ( !endpoints.valid() )
220+
{
221+
discoverEndpoints();
222+
}
231223

224+
switch ( mode )
225+
{
226+
case READ:
227+
return connections.acquire( endpoints.readServer );
228+
case WRITE:
229+
return connections.acquire( endpoints.writeServer );
230+
default:
231+
throw new ClientException( mode + " is not supported for creating new sessions" );
232+
}
233+
}
234+
235+
private synchronized void discoverEndpoints()
236+
{
232237
endpoints.clear();
233238
try
234239
{
@@ -255,7 +260,9 @@ else if ( serverMode.equals( "WRITE" ) )
255260
{
256261
log.warn( "Could not find procedure %s", ACQUIRE_ENDPOINTS );
257262
discoverable = false;
258-
return connections.acquire();
263+
Connection connection = connections.acquire();
264+
endpoints.readServer = connection.address();
265+
endpoints.writeServer = connection.address();
259266
}
260267
throw e;
261268
}
@@ -264,17 +271,6 @@ else if ( serverMode.equals( "WRITE" ) )
264271
{
265272
throw new ServiceUnavailableException("Could not establish any endpoints for the call");
266273
}
267-
268-
269-
switch ( mode )
270-
{
271-
case READ:
272-
return connections.acquire( endpoints.readServer );
273-
case WRITE:
274-
return connections.acquire( endpoints.writeServer );
275-
default:
276-
throw new ClientException( mode + " is not supported for creating new sessions" );
277-
}
278274
}
279275

280276
@Override
@@ -292,8 +288,8 @@ public void close()
292288

293289
private static class Endpoints
294290
{
295-
BoltServerAddress readServer;
296-
BoltServerAddress writeServer;
291+
private BoltServerAddress readServer;
292+
private BoltServerAddress writeServer;
297293

298294
public boolean valid()
299295
{
@@ -305,6 +301,22 @@ public void clear()
305301
readServer = null;
306302
writeServer = null;
307303
}
304+
305+
boolean contains(BoltServerAddress address)
306+
{
307+
if (readServer != null && writeServer != null)
308+
{
309+
return readServer.equals( address ) || writeServer.equals( address );
310+
}
311+
else if ( readServer != null )
312+
{
313+
return readServer.equals( address );
314+
}
315+
else
316+
{
317+
return writeServer != null && writeServer.equals( address );
318+
}
319+
}
308320
}
309321

310322
}

driver/src/main/java/org/neo4j/driver/internal/ClusterSettings.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,16 @@
2323

2424
public class ClusterSettings
2525
{
26-
private final int readRetry;
2726
private final int minimumNumberOfServers;
2827

29-
public ClusterSettings( int readRetry, int minimumNumberOfServers )
28+
public ClusterSettings( int minimumNumberOfServers )
3029
{
31-
this.readRetry = readRetry;
3230
this.minimumNumberOfServers = minimumNumberOfServers;
3331
}
3432

3533
public static ClusterSettings fromConfig( Config config )
3634
{
37-
return new ClusterSettings( config.maximumReadRetriesForCluster(), config.minimumKnownClusterSize() ) ;
38-
}
39-
40-
public int readRetry()
41-
{
42-
return readRetry;
35+
return new ClusterSettings( config.minimumKnownClusterSize() ) ;
4336
}
4437

4538
public int minimumNumberOfServers()

driver/src/main/java/org/neo4j/driver/internal/WriteNetworkSession.java renamed to driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,35 @@
1919
package org.neo4j.driver.internal;
2020

2121

22+
import org.neo4j.driver.internal.net.BoltServerAddress;
2223
import org.neo4j.driver.internal.spi.Connection;
24+
import org.neo4j.driver.internal.util.Consumer;
2325
import org.neo4j.driver.v1.Logger;
2426
import org.neo4j.driver.v1.Statement;
2527
import org.neo4j.driver.v1.StatementResult;
2628
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
2729
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
2830

29-
public class WriteNetworkSession extends NetworkSession
31+
public class ClusteredNetworkSession extends NetworkSession
3032
{
33+
private final Consumer<BoltServerAddress> onFailedConnection;
3134

32-
WriteNetworkSession(Connection connection, ClusterSettings clusterSettings, Logger logger )
35+
ClusteredNetworkSession( Connection connection, ClusterSettings clusterSettings, Consumer<BoltServerAddress> onFailedConnection, Logger logger )
3336
{
34-
super(connection, logger);
37+
super( connection, logger );
38+
this.onFailedConnection = onFailedConnection;
3539
}
3640

3741
@Override
3842
public StatementResult run( Statement statement )
3943
{
4044
try
4145
{
42-
return super.run( statement );
46+
return new ClusteredStatementResult( super.run( statement ), connection.address(), onFailedConnection );
4347
}//TODO we need to catch exceptions due to leader switches etc here
4448
catch ( ConnectionFailureException e )
4549
{
50+
onFailedConnection.accept( connection.address() );
4651
throw new SessionExpiredException( "Failed to perform write load to server", e );
4752
}
4853

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
import java.util.List;
22+
23+
import org.neo4j.driver.internal.net.BoltServerAddress;
24+
import org.neo4j.driver.internal.util.Consumer;
25+
import org.neo4j.driver.v1.Record;
26+
import org.neo4j.driver.v1.StatementResult;
27+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
28+
import org.neo4j.driver.v1.exceptions.NoSuchRecordException;
29+
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
30+
import org.neo4j.driver.v1.summary.ResultSummary;
31+
import org.neo4j.driver.v1.util.Function;
32+
33+
public class ClusteredStatementResult implements StatementResult
34+
{
35+
private final StatementResult delegate;
36+
private final BoltServerAddress address;
37+
private final Consumer<BoltServerAddress> onFailedConnection;
38+
39+
ClusteredStatementResult( StatementResult delegate, BoltServerAddress address, Consumer<BoltServerAddress> onFailedConnection )
40+
{
41+
this.delegate = delegate;
42+
this.address = address;
43+
this.onFailedConnection = onFailedConnection;
44+
}
45+
46+
@Override
47+
public List<String> keys()
48+
{
49+
return delegate.keys();
50+
}
51+
52+
@Override
53+
public boolean hasNext()
54+
{
55+
return delegate.hasNext();
56+
}
57+
58+
@Override
59+
public Record next()
60+
{
61+
try
62+
{
63+
return delegate.next();
64+
}
65+
catch ( ConnectionFailureException e )
66+
{
67+
throw sessionExpired( e );
68+
}
69+
}
70+
71+
72+
@Override
73+
public Record single() throws NoSuchRecordException
74+
{
75+
try
76+
{
77+
return delegate.single();
78+
}
79+
catch ( ConnectionFailureException e )
80+
{
81+
throw sessionExpired( e );
82+
}
83+
}
84+
85+
@Override
86+
public Record peek()
87+
{
88+
try
89+
{
90+
return delegate.peek();
91+
}
92+
catch ( ConnectionFailureException e )
93+
{
94+
throw sessionExpired( e );
95+
}
96+
}
97+
98+
@Override
99+
public List<Record> list()
100+
{
101+
try
102+
{
103+
return delegate.list();
104+
}
105+
catch ( ConnectionFailureException e )
106+
{
107+
throw sessionExpired( e );
108+
}
109+
}
110+
111+
@Override
112+
public <T> List<T> list( Function<Record,T> mapFunction )
113+
{
114+
try
115+
{
116+
return delegate.list(mapFunction);
117+
}
118+
catch ( ConnectionFailureException e )
119+
{
120+
throw sessionExpired( e );
121+
}
122+
}
123+
124+
@Override
125+
public ResultSummary consume()
126+
{
127+
try
128+
{
129+
return delegate.consume();
130+
}
131+
catch ( ConnectionFailureException e )
132+
{
133+
throw sessionExpired( e );
134+
}
135+
}
136+
137+
private SessionExpiredException sessionExpired( ConnectionFailureException e )
138+
{
139+
onFailedConnection.accept( address );
140+
return new SessionExpiredException( String.format( "Server at %s is no longer available", address.toString()), e);
141+
}
142+
}

0 commit comments

Comments
 (0)