diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 29056d9238..55e6902edd 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -39,7 +39,7 @@ import static org.neo4j.driver.v1.Values.ofValue; import static org.neo4j.driver.v1.Values.value; -class ExplicitTransaction implements Transaction +public class ExplicitTransaction implements Transaction { private enum State { @@ -71,7 +71,7 @@ private enum State private String bookmark = null; private State state = State.ACTIVE; - ExplicitTransaction( Connection conn, Runnable cleanup ) + public ExplicitTransaction( Connection conn, Runnable cleanup ) { this( conn, cleanup, null ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java index 8289f1cf33..158c0b83c6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java @@ -18,7 +18,7 @@ */ package org.neo4j.driver.internal; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.Logger; import static java.lang.System.lineSeparator; @@ -28,7 +28,7 @@ class LeakLoggingNetworkSession extends NetworkSession private final Logger log; private final String stackTrace; - LeakLoggingNetworkSession( Connection connection, Logger log ) + LeakLoggingNetworkSession( PooledConnection connection, Logger log ) { super( connection ); this.log = log; diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactory.java index 7ff959d98a..b7c8feeb42 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactory.java @@ -18,7 +18,7 @@ */ package org.neo4j.driver.internal; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Session; @@ -35,7 +35,7 @@ class LeakLoggingNetworkSessionFactory implements SessionFactory } @Override - public Session newInstance( Connection connection ) + public Session newInstance( PooledConnection connection ) { return new LeakLoggingNetworkSession( connection, logger ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 4b0b09406d..a02e49f69b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -24,6 +24,7 @@ import org.neo4j.driver.internal.logging.DevNullLogger; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.types.InternalTypeSystem; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Record; @@ -41,7 +42,7 @@ public class NetworkSession implements Session { - protected Connection connection; + private final PooledConnection connection; private final String sessionId; private final Logger logger; @@ -67,7 +68,7 @@ public void run() private ExplicitTransaction currentTransaction; private AtomicBoolean isOpen = new AtomicBoolean( true ); - NetworkSession( Connection connection ) + public NetworkSession( PooledConnection connection ) { this.connection = connection; diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSessionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSessionFactory.java index c8f0f85777..a1fb63927a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSessionFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSessionFactory.java @@ -18,13 +18,13 @@ */ package org.neo4j.driver.internal; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.Session; class NetworkSessionFactory implements SessionFactory { @Override - public Session newInstance( Connection connection ) + public Session newInstance( PooledConnection connection ) { return new NetworkSession( connection ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java index cdc594b2de..306ac10c03 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java @@ -22,8 +22,8 @@ import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.security.SecurityPlan; -import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Logging; @@ -62,12 +62,11 @@ public RoutingDriver( @Override protected Session newSessionWithMode( AccessMode mode ) { - Connection connection = acquireConnection( mode ); - Session networkSession = sessionFactory.newInstance( connection ); - return new RoutingNetworkSession( networkSession, mode, connection.boltServerAddress(), loadBalancer ); + PooledConnection connection = acquireConnection( mode ); + return sessionFactory.newInstance( connection ); } - private Connection acquireConnection( AccessMode role ) + private PooledConnection acquireConnection( AccessMode role ) { switch ( role ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingNetworkSession.java deleted file mode 100644 index 95d5c90306..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/RoutingNetworkSession.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - - -import java.util.Map; - -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.v1.AccessMode; -import org.neo4j.driver.v1.Record; -import org.neo4j.driver.v1.Session; -import org.neo4j.driver.v1.Statement; -import org.neo4j.driver.v1.StatementResult; -import org.neo4j.driver.v1.Transaction; -import org.neo4j.driver.v1.Value; -import org.neo4j.driver.v1.Values; -import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.exceptions.Neo4jException; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; -import org.neo4j.driver.v1.exceptions.SessionExpiredException; -import org.neo4j.driver.v1.types.TypeSystem; - -import static java.lang.String.format; -import static org.neo4j.driver.v1.Values.value; - -/** - * A session that safely handles routing errors. - */ -public class RoutingNetworkSession implements Session -{ - protected final Session delegate; - private final BoltServerAddress address; - private final AccessMode mode; - private final RoutingErrorHandler onError; - - RoutingNetworkSession( Session delegate, AccessMode mode, BoltServerAddress address, - RoutingErrorHandler onError ) - { - this.delegate = delegate; - this.mode = mode; - this.address = address; - this.onError = onError; - } - - @Override - public StatementResult run( String statementText ) - { - return run( statementText, Values.EmptyMap ); - } - - @Override - public StatementResult run( String statementText, Map statementParameters ) - { - Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters ); - return run( statementText, params ); - } - - @Override - public StatementResult run( String statementTemplate, Record statementParameters ) - { - Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters.asMap() ); - return run( statementTemplate, params ); - } - - @Override - public StatementResult run( String statementText, Value statementParameters ) - { - return run( new Statement( statementText, statementParameters ) ); - } - - @Override - public StatementResult run( Statement statement ) - { - try - { - return new RoutingStatementResult( delegate.run( statement ), mode, address, onError ); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - @Override - public TypeSystem typeSystem() - { - return delegate.typeSystem(); - } - - @Override - public Transaction beginTransaction() - { - return new RoutingTransaction( delegate.beginTransaction(), mode, address, onError); - } - - @Override - public Transaction beginTransaction( String bookmark ) - { - return new RoutingTransaction( delegate.beginTransaction(bookmark), mode, address, onError); - } - - @Override - public String lastBookmark() - { - return delegate.lastBookmark(); - } - - @Override - public void reset() - { - delegate.reset(); - } - - @Override - public boolean isOpen() - { - return delegate.isOpen(); - } - - @Override - public void close() - { - try - { - delegate.close(); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired(e, onError, address); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - public BoltServerAddress address() - { - return address; - } - - static Neo4jException filterFailureToWrite( ClientException e, AccessMode mode, RoutingErrorHandler onError, - BoltServerAddress address ) - { - if ( isFailedToWrite( e ) ) - { - // The server is unaware of the session mode, so we have to implement this logic in the driver. - // In the future, we might be able to move this logic to the server. - switch ( mode ) - { - case READ: - return new ClientException( "Write queries cannot be performed in READ access mode." ); - case WRITE: - onError.onWriteFailure( address ); - return new SessionExpiredException( format( "Server at %s no longer accepts writes", address ) ); - default: - throw new IllegalArgumentException( mode + " not supported." ); - } - } - else - { - return e; - } - } - - static SessionExpiredException sessionExpired( ServiceUnavailableException e, RoutingErrorHandler onError, - BoltServerAddress address ) - { - onError.onConnectionFailure( address ); - return new SessionExpiredException( format( "Server at %s is no longer available", address.toString() ), e ); - } - - private static boolean isFailedToWrite( ClientException e ) - { - return e.code().equals( "Neo.ClientError.Cluster.NotALeader" ) || - e.code().equals( "Neo.ClientError.General.ForbiddenOnReadOnlyDatabase" ); - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingStatementResult.java deleted file mode 100644 index 19682b6943..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/RoutingStatementResult.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - -import java.util.List; - -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.v1.AccessMode; -import org.neo4j.driver.v1.Record; -import org.neo4j.driver.v1.StatementResult; -import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; -import org.neo4j.driver.v1.exceptions.NoSuchRecordException; -import org.neo4j.driver.v1.summary.ResultSummary; -import org.neo4j.driver.v1.util.Function; - -import static org.neo4j.driver.internal.RoutingNetworkSession.filterFailureToWrite; -import static org.neo4j.driver.internal.RoutingNetworkSession.sessionExpired; - -public class RoutingStatementResult implements StatementResult -{ - private final StatementResult delegate; - private final AccessMode mode; - private final BoltServerAddress address; - private final RoutingErrorHandler onError; - - RoutingStatementResult( StatementResult delegate, AccessMode mode, BoltServerAddress address, - RoutingErrorHandler onError ) - { - this.delegate = delegate; - this.mode = mode; - this.address = address; - this.onError = onError; - } - - @Override - public List keys() - { - try - { - return delegate.keys(); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - @Override - public boolean hasNext() - { - try - { - return delegate.hasNext(); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - @Override - public Record next() - { - try - { - return delegate.next(); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - - @Override - public Record single() throws NoSuchRecordException - { - try - { - return delegate.single(); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - @Override - public Record peek() - { - try - { - return delegate.peek(); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - @Override - public List list() - { - try - { - return delegate.list(); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - @Override - public List list( Function mapFunction ) - { - try - { - return delegate.list( mapFunction ); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - @Override - public void remove() - { - throw new ClientException( "Removing records from a result is not supported." ); - } - - @Override - public ResultSummary consume() - { - try - { - return delegate.consume(); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - @Override - public ResultSummary summary() - { - try - { - return delegate.summary(); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - public BoltServerAddress address() - { - return address; - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingTransaction.java deleted file mode 100644 index 42f21f69c5..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/RoutingTransaction.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - - -import java.util.Map; - -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.v1.AccessMode; -import org.neo4j.driver.v1.Record; -import org.neo4j.driver.v1.Statement; -import org.neo4j.driver.v1.StatementResult; -import org.neo4j.driver.v1.Transaction; -import org.neo4j.driver.v1.Value; -import org.neo4j.driver.v1.Values; -import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; -import org.neo4j.driver.v1.types.TypeSystem; - -import static org.neo4j.driver.internal.RoutingNetworkSession.filterFailureToWrite; -import static org.neo4j.driver.internal.RoutingNetworkSession.sessionExpired; -import static org.neo4j.driver.v1.Values.value; - -/** - * A transaction that safely handles routing errors. - */ -public class RoutingTransaction implements Transaction -{ - protected final Transaction delegate; - private final AccessMode mode; - private final BoltServerAddress address; - private final RoutingErrorHandler onError; - - public RoutingTransaction( Transaction delegate, AccessMode mode, BoltServerAddress address, - RoutingErrorHandler onError ) - { - this.delegate = delegate; - this.mode = mode; - this.address = address; - this.onError = onError; - } - - @Override - public StatementResult run( String statementText ) - { - return run( statementText, Values.EmptyMap ); - } - - @Override - public StatementResult run( String statementText, Map statementParameters ) - { - Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters ); - return run( statementText, params ); - } - - @Override - public StatementResult run( String statementTemplate, Record statementParameters ) - { - Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters.asMap() ); - return run( statementTemplate, params ); - } - - @Override - public StatementResult run( String statementText, Value statementParameters ) - { - return run( new Statement( statementText, statementParameters ) ); - } - - @Override - public StatementResult run( Statement statement ) - { - try - { - return new RoutingStatementResult( delegate.run( statement ), mode, address, onError ); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired( e, onError, address ); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } - - @Override - public TypeSystem typeSystem() - { - return delegate.typeSystem(); - } - - - @Override - public void success() - { - delegate.success(); - } - - @Override - public void failure() - { - delegate.failure(); - } - - @Override - public boolean isOpen() - { - return delegate.isOpen(); - } - - @Override - public void close() - { - try - { - delegate.close(); - } - catch ( ServiceUnavailableException e ) - { - throw sessionExpired(e, onError, address); - } - catch ( ClientException e ) - { - throw filterFailureToWrite( e, mode, onError, address ); - } - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java index 550f7084fb..39039fbc8a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java @@ -18,10 +18,10 @@ */ package org.neo4j.driver.internal; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.Session; interface SessionFactory { - Session newInstance( Connection connection ); + Session newInstance( PooledConnection connection ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java index 997b32e0c2..d3440555bf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java @@ -22,9 +22,10 @@ import org.neo4j.driver.internal.RoutingErrorHandler; import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.exceptions.ProtocolException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; @@ -73,14 +74,16 @@ private LoadBalancer( ensureRouting(); } - public Connection acquireReadConnection() throws ServiceUnavailableException + public PooledConnection acquireReadConnection() throws ServiceUnavailableException { - return acquireConnection( routingTable.readers() ); + PooledConnection connection = acquireConnection( routingTable.readers() ); + return new RoutingPooledConnection( connection, this, AccessMode.READ ); } - public Connection acquireWriteConnection() throws ServiceUnavailableException + public PooledConnection acquireWriteConnection() throws ServiceUnavailableException { - return acquireConnection( routingTable.writers() ); + PooledConnection connection = acquireConnection( routingTable.writers() ); + return new RoutingPooledConnection( connection, this, AccessMode.WRITE ); } @Override @@ -101,7 +104,7 @@ public void close() throws Exception connections.close(); } - private Connection acquireConnection( RoundRobinAddressSet servers ) throws ServiceUnavailableException + private PooledConnection acquireConnection( RoundRobinAddressSet servers ) throws ServiceUnavailableException { for ( ; ; ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java index 194a8f517c..b933f6da5c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java @@ -24,7 +24,7 @@ import org.neo4j.driver.internal.net.BoltServerAddress; -class RoundRobinAddressSet +public class RoundRobinAddressSet { private static final BoltServerAddress[] NONE = {}; private final AtomicInteger offset = new AtomicInteger(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingPooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingPooledConnection.java new file mode 100644 index 0000000000..a61afe7d3e --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingPooledConnection.java @@ -0,0 +1,291 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import java.util.Map; +import java.util.Objects; + +import org.neo4j.driver.internal.RoutingErrorHandler; +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.spi.Collector; +import org.neo4j.driver.internal.spi.PooledConnection; +import org.neo4j.driver.v1.AccessMode; +import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import org.neo4j.driver.v1.summary.ServerInfo; + +import static java.lang.String.format; + +class RoutingPooledConnection implements PooledConnection +{ + private final PooledConnection delegate; + private final RoutingErrorHandler errorHandler; + private final AccessMode accessMode; + + RoutingPooledConnection( PooledConnection delegate, RoutingErrorHandler errorHandler, AccessMode accessMode ) + { + this.delegate = delegate; + this.errorHandler = errorHandler; + this.accessMode = accessMode; + } + + @Override + public void init( String clientName, Map authToken ) + { + try + { + delegate.init( clientName, authToken ); + } + catch ( RuntimeException e ) + { + throw handledException( e ); + } + } + + @Override + public void run( String statement, Map parameters, Collector collector ) + { + try + { + delegate.run( statement, parameters, collector ); + } + catch ( RuntimeException e ) + { + throw handledException( e ); + } + } + + @Override + public void discardAll( Collector collector ) + { + try + { + delegate.discardAll( collector ); + } + catch ( RuntimeException e ) + { + throw handledException( e ); + } + } + + @Override + public void pullAll( Collector collector ) + { + try + { + delegate.pullAll( collector ); + } + catch ( RuntimeException e ) + { + throw handledException( e ); + } + } + + @Override + public void reset() + { + try + { + delegate.reset(); + } + catch ( RuntimeException e ) + { + throw handledException( e ); + } + } + + @Override + public void resetAsync() + { + try + { + delegate.resetAsync(); + } + catch ( RuntimeException e ) + { + throw handledException( e ); + } + } + + @Override + public void ackFailure() + { + try + { + delegate.ackFailure(); + } + catch ( RuntimeException e ) + { + throw handledException( e ); + } + } + + @Override + public void sync() + { + try + { + delegate.sync(); + } + catch ( RuntimeException e ) + { + throw handledException( e ); + } + } + + @Override + public void flush() + { + try + { + delegate.flush(); + } + catch ( RuntimeException e ) + { + throw handledException( e ); + } + } + + @Override + public void receiveOne() + { + try + { + delegate.receiveOne(); + } + catch ( RuntimeException e ) + { + throw handledException( e ); + } + } + + @Override + public void close() + { + delegate.close(); + } + + @Override + public boolean isOpen() + { + return delegate.isOpen(); + } + + @Override + public void onError( Runnable runnable ) + { + delegate.onError( runnable ); + } + + @Override + public boolean hasUnrecoverableErrors() + { + return delegate.hasUnrecoverableErrors(); + } + + @Override + public boolean isAckFailureMuted() + { + return delegate.isAckFailureMuted(); + } + + @Override + public ServerInfo server() + { + return delegate.server(); + } + + @Override + public BoltServerAddress boltServerAddress() + { + return delegate.boltServerAddress(); + } + + @Override + public Logger logger() + { + return delegate.logger(); + } + + @Override + public long lastUsedTimestamp() + { + return delegate.lastUsedTimestamp(); + } + + @Override + public void dispose() + { + delegate.dispose(); + } + + private RuntimeException handledException( RuntimeException e ) + { + if ( e instanceof ServiceUnavailableException ) + { + return handledServiceUnavailableException( ((ServiceUnavailableException) e) ); + } + else if ( e instanceof ClientException ) + { + return handledClientException( ((ClientException) e) ); + } + else + { + return e; + } + } + + private RuntimeException handledServiceUnavailableException( ServiceUnavailableException e ) + { + BoltServerAddress address = boltServerAddress(); + errorHandler.onConnectionFailure( address ); + return new SessionExpiredException( format( "Server at %s is no longer available", address ), e ); + } + + private RuntimeException handledClientException( ClientException e ) + { + if ( isFailureToWrite( e ) ) + { + // The server is unaware of the session mode, so we have to implement this logic in the driver. + // In the future, we might be able to move this logic to the server. + switch ( accessMode ) + { + case READ: + return new ClientException( "Write queries cannot be performed in READ access mode." ); + case WRITE: + BoltServerAddress address = boltServerAddress(); + errorHandler.onWriteFailure( address ); + return new SessionExpiredException( format( "Server at %s no longer accepts writes", address ) ); + default: + throw new IllegalArgumentException( accessMode + " not supported." ); + } + } + return e; + } + + private static boolean isFailureToWrite( ClientException e ) + { + String errorCode = e.code(); + return Objects.equals( errorCode, "Neo.ClientError.Cluster.NotALeader" ) || + Objects.equals( errorCode, "Neo.ClientError.General.ForbiddenOnReadOnlyDatabase" ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java index d533bb1e01..0460c63109 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java @@ -185,18 +185,6 @@ public boolean isOpen() return delegate.isOpen(); } - @Override - public void onError( Runnable runnable ) - { - delegate.onError( runnable ); - } - - @Override - public boolean hasUnrecoverableErrors() - { - return delegate.hasUnrecoverableErrors(); - } - @Override public void resetAsync() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java index cde937b0c6..357367222a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java @@ -268,18 +268,6 @@ public boolean isOpen() return socket.isOpen(); } - @Override - public void onError( Runnable runnable ) - { - throw new UnsupportedOperationException( "Error subscribers are not supported on SocketConnection." ); - } - - @Override - public boolean hasUnrecoverableErrors() - { - throw new UnsupportedOperationException( "Unrecoverable error detection is not supported on SocketConnection." ); - } - @Override public synchronized void resetAsync() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java index 6f430b4f3b..61020e5c7c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java index 618796ca98..84430fe27d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java @@ -19,6 +19,7 @@ package org.neo4j.driver.internal.net.pooling; import org.neo4j.driver.internal.spi.ConnectionValidator; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Consumer; /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java index 5480cc84fb..b779c49829 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java @@ -20,6 +20,7 @@ import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.spi.ConnectionValidator; +import org.neo4j.driver.internal.spi.PooledConnection; class PooledConnectionValidator implements ConnectionValidator { diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnection.java similarity index 88% rename from driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java rename to driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnection.java index bae48024c2..8f3c18b156 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnection.java @@ -23,6 +23,7 @@ import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.Logger; @@ -50,7 +51,7 @@ * | pool.close | * --------------------------------- */ -public class PooledConnection implements Connection +public class PooledSocketConnection implements PooledConnection { /** The real connection who will do all the real jobs */ private final Connection delegate; @@ -61,7 +62,7 @@ public class PooledConnection implements Connection private final Clock clock; private long lastUsedTimestamp; - public PooledConnection( Connection delegate, Consumer release, Clock clock ) + public PooledSocketConnection( Connection delegate, Consumer release, Clock clock ) { this.delegate = delegate; this.release = release; @@ -206,6 +207,7 @@ public boolean isOpen() return delegate.isOpen(); } + @Override public boolean hasUnrecoverableErrors() { return unrecoverableErrorsOccurred; @@ -248,6 +250,7 @@ public Logger logger() return delegate.logger(); } + @Override public void dispose() { delegate.close(); @@ -282,23 +285,37 @@ public void onError( Runnable runnable ) this.onError = runnable; } + @Override public long lastUsedTimestamp() { return lastUsedTimestamp; } - private boolean isProtocolViolationError(RuntimeException e ) + private boolean isProtocolViolationError( RuntimeException e ) { - return e instanceof Neo4jException - && ((Neo4jException) e).code().startsWith( "Neo.ClientError.Request" ); + if ( e instanceof Neo4jException ) + { + String errorCode = ((Neo4jException) e).code(); + if ( errorCode != null ) + { + return errorCode.startsWith( "Neo.ClientError.Request" ); + } + } + return false; } private boolean isClientOrTransientError( RuntimeException e ) { // Eg: DatabaseErrors and unknown (no status code or not neo4j exception) cause session to be discarded - return e instanceof Neo4jException - && (((Neo4jException) e).code().contains( "ClientError" ) - || ((Neo4jException) e).code().contains( "TransientError" )); + if ( e instanceof Neo4jException ) + { + String errorCode = ((Neo4jException) e).code(); + if ( errorCode != null ) + { + return errorCode.contains( "ClientError" ) || errorCode.contains( "TransientError" ); + } + } + return false; } private void updateLastUsedTimestamp() diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index f3ee7bb1bd..797a905771 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -27,6 +27,7 @@ import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.spi.ConnectionValidator; import org.neo4j.driver.internal.spi.Connector; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.Logging; @@ -68,7 +69,7 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo } @Override - public Connection acquire( final BoltServerAddress address ) + public PooledConnection acquire( final BoltServerAddress address ) { assertNotClosed(); BlockingPooledConnectionQueue connectionQueue = pool( address ); @@ -201,7 +202,7 @@ public PooledConnection get() PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( connectionQueue, connectionValidator ); Connection connection = connector.connect( address ); - PooledConnection pooledConnection = new PooledConnection( connection, releaseConsumer, clock ); + PooledConnection pooledConnection = new PooledSocketConnection( connection, releaseConsumer, clock ); connectionCreated = true; return pooledConnection; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index 390c71dcc8..20860e65d0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -98,18 +98,6 @@ public interface Connection extends AutoCloseable */ boolean isOpen(); - /** - * If there are any errors that occur on this connection, invoke the given - * runnable. This is used in the driver to clean up resources associated with - * the connection, like an open transaction. - * - * @param runnable To be run on error. - */ - void onError( Runnable runnable ); - - - boolean hasUnrecoverableErrors(); - /** * Asynchronously sending reset to the socket output channel. */ diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java index 185f2a7f32..5ca7268176 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java @@ -28,7 +28,7 @@ public interface ConnectionPool extends AutoCloseable * * @param address The address to acquire */ - Connection acquire( BoltServerAddress address ); + PooledConnection acquire( BoltServerAddress address ); /** * Removes all connections to a given address from the pool. diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/PooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/PooledConnection.java new file mode 100644 index 0000000000..8bd931d50f --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/PooledConnection.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.spi; + +import org.neo4j.driver.internal.util.Clock; + +public interface PooledConnection extends Connection +{ + /** + * If there are any errors that occur on this connection, invoke the given + * runnable. This is used in the driver to clean up resources associated with + * the connection, like an open transaction. + * + * @param runnable To be run on error. + */ + void onError( Runnable runnable ); + + /** + * Check if this connection experienced any unrecoverable errors. Connections with unrecoverable errors should be + * closed and not returned to the pool for future reuse. + * + * @return {@code true} if any unrecoverable error happened, {@code false} otherwise. + */ + boolean hasUnrecoverableErrors(); + + /** + * Timestamp of when this connection was used. This timestamp is updated when connection is returned to the pool. + * + * @return timestamp as returned by {@link Clock#millis()}. + */ + long lastUsedTimestamp(); + + /** + * Destroy this connection and associated network resources. + */ + void dispose(); +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactoryTest.java index 6bd711f41e..f8887b4d11 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactoryTest.java @@ -20,7 +20,7 @@ import org.junit.Test; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Session; @@ -35,7 +35,7 @@ public void createsLeakLoggingNetworkSessions() { SessionFactory factory = new LeakLoggingNetworkSessionFactory( mock( Logging.class ) ); - Session session = factory.newInstance( mock( Connection.class ) ); + Session session = factory.newInstance( mock( PooledConnection.class ) ); assertThat( session, instanceOf( LeakLoggingNetworkSession.class ) ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java index 5652f384c0..68629c5e5e 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java @@ -25,7 +25,7 @@ import java.lang.reflect.Method; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Session; @@ -83,9 +83,9 @@ private static void finalize( Session session ) throws Exception finalizeMethod.invoke( session ); } - private static Connection connectionMock( boolean open ) + private static PooledConnection connectionMock( boolean open ) { - Connection connection = mock( Connection.class ); + PooledConnection connection = mock( PooledConnection.class ); when( connection.isOpen() ).thenReturn( open ); return connection; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionFactoryTest.java index 7df9f3a1bd..3607040b76 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionFactoryTest.java @@ -20,7 +20,7 @@ import org.junit.Test; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.Session; import static org.hamcrest.Matchers.instanceOf; @@ -34,7 +34,7 @@ public void createsNetworkSessions() { SessionFactory factory = new NetworkSessionFactory(); - Session session = factory.newInstance( mock( Connection.class ) ); + Session session = factory.newInstance( mock( PooledConnection.class ) ); assertThat( session, instanceOf( NetworkSession.class ) ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 86484300aa..40e868b295 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -22,7 +22,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; @@ -40,8 +40,8 @@ public class NetworkSessionTest @Rule public ExpectedException exception = ExpectedException.none(); - private final Connection mock = mock( Connection.class ); - private NetworkSession sess = new NetworkSession( mock ); + private final PooledConnection mock = mock( PooledConnection.class ); + private final NetworkSession sess = new NetworkSession( mock ); @Test public void shouldSendAllOnRun() throws Throwable @@ -143,7 +143,7 @@ public void shouldNotAllowMoreTransactionsInSessionWhileConnectionClosed() throw public void shouldGetExceptionIfTryingToCloseSessionMoreThanOnce() throws Throwable { // Given - NetworkSession sess = new NetworkSession( mock(Connection.class) ); + NetworkSession sess = new NetworkSession( mock( PooledConnection.class ) ); try { sess.close(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java index 4334a9506c..366e9fbdd2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java @@ -32,14 +32,15 @@ import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Collector; -import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.EventLogger; import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Logging; +import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.ProtocolException; @@ -101,10 +102,10 @@ public void shouldRediscoveryIfNoWritersProvided() serverInfo( "WRITE", "localhost:3333" ) ) ) ); // When - RoutingNetworkSession writing = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress writing = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); // Then - assertEquals( boltAddress( "localhost", 3333 ), writing.address() ); + assertEquals( boltAddress( "localhost", 3333 ), writing.address ); } @Test @@ -120,12 +121,12 @@ public void shouldNotRediscoveryOnSessionAcquisitionIfNotNecessary() serverInfo( "WRITE", "localhost:5555" ) ) ) ); // When - RoutingNetworkSession writing = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); - RoutingNetworkSession reading = (RoutingNetworkSession) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress writing = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress reading = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); // Then - assertEquals( boltAddress( "localhost", 3333 ), writing.address() ); - assertEquals( boltAddress( "localhost", 2222 ), reading.address() ); + assertEquals( boltAddress( "localhost", 3333 ), writing.address ); + assertEquals( boltAddress( "localhost", 2222 ), reading.address ); } @Test @@ -206,12 +207,12 @@ public void shouldForgetServersOnRediscovery() RoutingDriver routingDriver = driverWithPool( pool ); // When - RoutingNetworkSession write1 = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); - RoutingNetworkSession write2 = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write1 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write2 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); // Then - assertEquals( boltAddress( "localhost", 3333 ), write1.address() ); - assertEquals( boltAddress( "localhost", 3333 ), write2.address() ); + assertEquals( boltAddress( "localhost", 3333 ), write1.address ); + assertEquals( boltAddress( "localhost", 3333 ), write2.address ); } @Test @@ -229,12 +230,12 @@ public void shouldRediscoverOnTimeout() clock.progress( 11_000 ); // When - RoutingNetworkSession writing = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); - RoutingNetworkSession reading = (RoutingNetworkSession) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress writing = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress reading = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); // Then - assertEquals( boltAddress( "localhost", 8888 ), writing.address() ); - assertEquals( boltAddress( "localhost", 7777 ), reading.address() ); + assertEquals( boltAddress( "localhost", 8888 ), writing.address ); + assertEquals( boltAddress( "localhost", 7777 ), reading.address ); } @Test @@ -251,12 +252,12 @@ public void shouldNotRediscoverWhenNoTimeout() clock.progress( 9900 ); // When - RoutingNetworkSession writer = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); - RoutingNetworkSession reader = (RoutingNetworkSession) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress writer = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress reader = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); // Then - assertEquals( boltAddress( "localhost", 2222 ), reader.address() ); - assertEquals( boltAddress( "localhost", 3333 ), writer.address() ); + assertEquals( boltAddress( "localhost", 2222 ), reader.address ); + assertEquals( boltAddress( "localhost", 3333 ), writer.address ); } @Test @@ -268,20 +269,20 @@ public void shouldRoundRobinAmongReadServers() serverInfo( "WRITE", "localhost:3333" ) ); // When - RoutingNetworkSession read1 = (RoutingNetworkSession) routingDriver.session( AccessMode.READ ); - RoutingNetworkSession read2 = (RoutingNetworkSession) routingDriver.session( AccessMode.READ ); - RoutingNetworkSession read3 = (RoutingNetworkSession) routingDriver.session( AccessMode.READ ); - RoutingNetworkSession read4 = (RoutingNetworkSession) routingDriver.session( AccessMode.READ ); - RoutingNetworkSession read5 = (RoutingNetworkSession) routingDriver.session( AccessMode.READ ); - RoutingNetworkSession read6 = (RoutingNetworkSession) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress read1 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress read2 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress read3 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress read4 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress read5 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress read6 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); // Then - assertEquals( read1.address(), read4.address() ); - assertEquals( read2.address(), read5.address() ); - assertEquals( read3.address(), read6.address() ); - assertNotEquals( read1.address(), read2.address() ); - assertNotEquals( read2.address(), read3.address() ); - assertNotEquals( read3.address(), read1.address() ); + assertEquals( read1.address, read4.address ); + assertEquals( read2.address, read5.address ); + assertEquals( read3.address, read6.address ); + assertNotEquals( read1.address, read2.address ); + assertNotEquals( read2.address, read3.address ); + assertNotEquals( read3.address, read1.address ); } @Test @@ -293,20 +294,20 @@ public void shouldRoundRobinAmongWriteServers() serverInfo( "WRITE", "localhost:2222", "localhost:2223", "localhost:2224" ) ); // When - RoutingNetworkSession write1 = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); - RoutingNetworkSession write2 = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); - RoutingNetworkSession write3 = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); - RoutingNetworkSession write4 = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); - RoutingNetworkSession write5 = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); - RoutingNetworkSession write6 = (RoutingNetworkSession) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write1 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write2 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write3 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write4 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write5 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write6 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); // Then - assertEquals( write1.address(), write4.address() ); - assertEquals( write2.address(), write5.address() ); - assertEquals( write3.address(), write6.address() ); - assertNotEquals( write1.address(), write2.address() ); - assertNotEquals( write2.address(), write3.address() ); - assertNotEquals( write3.address(), write1.address() ); + assertEquals( write1.address, write4.address ); + assertEquals( write2.address, write5.address ); + assertEquals( write3.address, write6.address ); + assertNotEquals( write1.address, write2.address ); + assertNotEquals( write2.address, write3.address ); + assertNotEquals( write3.address, write1.address ); } @Test @@ -338,7 +339,8 @@ private final RoutingDriver driverWithServers( long ttl, Map... s private RoutingDriver driverWithPool( ConnectionPool pool ) { RoutingSettings settings = new RoutingSettings( 10, 5_000 ); - return new RoutingDriver( settings, SEED, pool, insecure(), new NetworkSessionFactory(), clock, logging ); + SessionFactory sessionFactory = new NetworkSessionWithAddressFactory(); + return new RoutingDriver( settings, SEED, pool, insecure(), sessionFactory, clock, logging ); } @SafeVarargs @@ -362,15 +364,15 @@ private ConnectionPool pool( final Answer toGetServers, final Answer... furtherG { ConnectionPool pool = mock( ConnectionPool.class ); - when( pool.acquire( any( BoltServerAddress.class ) ) ).thenAnswer( new Answer() + when( pool.acquire( any( BoltServerAddress.class ) ) ).thenAnswer( new Answer() { int answer; @Override - public Connection answer( InvocationOnMock invocationOnMock ) throws Throwable + public PooledConnection answer( InvocationOnMock invocationOnMock ) throws Throwable { BoltServerAddress address = invocationOnMock.getArgumentAt( 0, BoltServerAddress.class ); - Connection connection = mock( Connection.class ); + PooledConnection connection = mock( PooledConnection.class ); when( connection.isOpen() ).thenReturn( true ); when( connection.boltServerAddress() ).thenReturn( address ); doAnswer( withKeys( "ttl", "servers" ) ).when( connection ).run( @@ -419,6 +421,26 @@ void collect( Collector collector ) }; } + private static class NetworkSessionWithAddressFactory implements SessionFactory + { + @Override + public Session newInstance( PooledConnection connection ) + { + return new NetworkSessionWithAddress( connection ); + } + } + + private static class NetworkSessionWithAddress extends NetworkSession + { + final BoltServerAddress address; + + NetworkSessionWithAddress( PooledConnection connection ) + { + super( connection ); + this.address = connection.boltServerAddress(); + } + } + private static abstract class CollectorAnswer implements Answer { abstract void collect( Collector collector ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingNetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingNetworkSessionTest.java deleted file mode 100644 index 6cea871565..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingNetworkSessionTest.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - -import org.junit.Before; -import org.junit.Test; - -import java.util.Map; - -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.spi.Collector; -import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.v1.AccessMode; -import org.neo4j.driver.v1.Session; -import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; -import org.neo4j.driver.v1.exceptions.SessionExpiredException; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -public class RoutingNetworkSessionTest -{ - private Connection connection; - private RoutingErrorHandler onError; - private static final BoltServerAddress LOCALHOST = new BoltServerAddress( "localhost", 7687 ); - - @Before - public void setUp() - { - connection = mock( Connection.class ); - when( connection.boltServerAddress() ).thenReturn( LOCALHOST ); - when( connection.isOpen() ).thenReturn( true ); - onError = mock( RoutingErrorHandler.class ); - } - - @SuppressWarnings( "unchecked" ) - @Test - public void shouldHandleConnectionFailures() - { - // Given - doThrow( new ServiceUnavailableException( "oh no" ) ). - when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); - - RoutingNetworkSession result = - new RoutingNetworkSession( new NetworkSession( connection ), AccessMode.WRITE, connection.boltServerAddress(), - onError ); - - // When - try - { - result.run( "CREATE ()" ); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @SuppressWarnings( "unchecked" ) - @Test - public void shouldHandleWriteFailuresInWriteAccessMode() - { - // Given - doThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ). - when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); - RoutingNetworkSession session = - new RoutingNetworkSession( new NetworkSession(connection), AccessMode.WRITE, connection.boltServerAddress(), - onError ); - - // When - try - { - session.run( "CREATE ()" ); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @SuppressWarnings( "unchecked" ) - @Test - public void shouldHandleWriteFailuresInReadAccessMode() - { - // Given - doThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ). - when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); - RoutingNetworkSession session = - new RoutingNetworkSession( new NetworkSession( connection ), AccessMode.READ, connection.boltServerAddress(), onError ); - - // When - try - { - session.run( "CREATE ()" ); - fail(); - } - catch ( ClientException e ) - { - //ignore - } - verifyNoMoreInteractions( onError ); - } - - @SuppressWarnings( "unchecked" ) - @Test - public void shouldRethrowNonWriteFailures() - { - // Given - ClientException toBeThrown = new ClientException( "code", "oh no!" ); - doThrow( toBeThrown ). - when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); - RoutingNetworkSession session = - new RoutingNetworkSession( new NetworkSession( connection ), AccessMode.WRITE, connection.boltServerAddress(), onError ); - - // When - try - { - session.run( "CREATE ()" ); - fail(); - } - catch ( ClientException e ) - { - assertThat( e, is( toBeThrown ) ); - } - - // Then - verifyZeroInteractions( onError ); - } - - @Test - public void shouldHandleConnectionFailuresOnClose() - { - // Given - doThrow( new ServiceUnavailableException( "oh no" ) ). - when( connection ).sync(); - - RoutingNetworkSession session = - new RoutingNetworkSession( new NetworkSession( connection ), AccessMode.WRITE, connection.boltServerAddress(), - onError ); - - // When - try - { - session.close(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailuresOnClose() - { - // Given - doThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ).when( connection ).sync(); - - RoutingNetworkSession session = - new RoutingNetworkSession( new NetworkSession( connection ), AccessMode.WRITE, connection.boltServerAddress(), onError ); - - // When - try - { - session.close(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldDelegateLastBookmark() - { - // Given - Session inner = mock( Session.class ); - RoutingNetworkSession session = - new RoutingNetworkSession( inner, AccessMode.WRITE, connection.boltServerAddress(), onError ); - - - // When - session.lastBookmark(); - - // Then - verify( inner ).lastBookmark(); - } - - @Test - public void shouldDelegateReset() - { - // Given - Session inner = mock( Session.class ); - RoutingNetworkSession session = - new RoutingNetworkSession( inner, AccessMode.WRITE, connection.boltServerAddress(), onError ); - - - // When - session.reset(); - - // Then - verify( inner ).reset(); - } - - @Test - public void shouldDelegateIsOpen() - { - // Given - Session inner = mock( Session.class ); - RoutingNetworkSession session = - new RoutingNetworkSession( inner, AccessMode.WRITE, connection.boltServerAddress(), onError ); - - - // When - session.isOpen(); - - // Then - verify( inner ).isOpen(); - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingReadStatementResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingReadStatementResultTest.java deleted file mode 100644 index 563e529f5c..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingReadStatementResultTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - -import org.junit.Test; - -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.v1.StatementResult; -import org.neo4j.driver.v1.exceptions.ClientException; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; -import static org.neo4j.driver.v1.AccessMode.READ; - -public class RoutingReadStatementResultTest -{ - private static final BoltServerAddress LOCALHOST = new BoltServerAddress( "localhost", 7687 ); - private StatementResult delegate = mock( StatementResult.class ); - private RoutingErrorHandler onError = mock( RoutingErrorHandler.class ); - - @Test - public void shouldHandleWriteFailureOnConsume() - { - // Given - when( delegate.consume() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, READ, LOCALHOST, onError ); - - // When - try - { - result.consume(); - fail(); - } - catch ( ClientException e ) - { - //ignore - } - - // Then - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnHasNext() - { - // Given - when( delegate.hasNext() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, READ, LOCALHOST, onError ); - - // When - try - { - result.hasNext(); - fail(); - } - catch ( ClientException e ) - { - //ignore - } - - // Then - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnKeys() - { - // Given - when( delegate.keys() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, READ, LOCALHOST, onError ); - - // When - try - { - result.keys(); - fail(); - } - catch ( ClientException e ) - { - //ignore - } - - // Then - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnList() - { - // Given - when( delegate.list() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, READ, LOCALHOST, onError ); - - // When - try - { - result.list(); - fail(); - } - catch ( ClientException e ) - { - //ignore - } - - // Then - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnNext() - { - // Given - when( delegate.next() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, READ, LOCALHOST, onError ); - - // When - try - { - result.next(); - fail(); - } - catch ( ClientException e ) - { - //ignore - } - - // Then - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnPeek() - { - // Given - when( delegate.peek() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, READ, LOCALHOST, onError ); - - // When - try - { - result.peek(); - fail(); - } - catch ( ClientException e ) - { - //ignore - } - - // Then - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnSingle() - { - // Given - when( delegate.single() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, READ, LOCALHOST, onError ); - - // When - try - { - result.single(); - fail(); - } - catch ( ClientException e ) - { - //ignore - } - - // Then - verifyNoMoreInteractions( onError ); - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingStatementResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingStatementResultTest.java deleted file mode 100644 index 3a25212d69..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingStatementResultTest.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.Collection; - -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.v1.AccessMode; -import org.neo4j.driver.v1.StatementResult; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; -import org.neo4j.driver.v1.exceptions.SessionExpiredException; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - - -@RunWith(Parameterized.class) -public class RoutingStatementResultTest -{ - private static final BoltServerAddress LOCALHOST = new BoltServerAddress( "localhost", 7687 ); - private StatementResult delegate = mock( StatementResult.class ); - private RoutingErrorHandler onError = mock( RoutingErrorHandler.class ); - private final AccessMode accessMode; - - @Parameterized.Parameters(name = "accessMode-{0}") - public static Collection data() - { - return Arrays.asList( AccessMode.values() ); - } - - public RoutingStatementResultTest( AccessMode accessMode ) - { - this.accessMode = accessMode; - } - - @Test - public void shouldHandleConnectionFailureOnConsume() - { - // Given - when( delegate.consume() ).thenThrow( new ServiceUnavailableException( "oh no" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, accessMode, LOCALHOST, onError ); - - // When - try - { - result.consume(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleConnectionFailureOnHasNext() - { - // Given - when( delegate.hasNext() ).thenThrow( new ServiceUnavailableException( "oh no" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, accessMode, LOCALHOST, onError ); - - // When - try - { - result.hasNext(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleConnectionFailureOnKeys() - { - // Given - when( delegate.keys() ).thenThrow( new ServiceUnavailableException( "oh no" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, accessMode, LOCALHOST, onError ); - - // When - try - { - result.keys(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleConnectionFailureOnList() - { - // Given - when( delegate.list() ).thenThrow( new ServiceUnavailableException( "oh no" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, accessMode, LOCALHOST, onError ); - - // When - try - { - result.list(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleConnectionFailureOnNext() - { - // Given - when( delegate.next() ).thenThrow( new ServiceUnavailableException( "oh no" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, accessMode, LOCALHOST, onError ); - - // When - try - { - result.next(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleConnectionFailureOnPeek() - { - // Given - when( delegate.peek() ).thenThrow( new ServiceUnavailableException( "oh no" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, accessMode, LOCALHOST, onError ); - - // When - try - { - result.peek(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleConnectionFailureOnSingle() - { - // Given - when( delegate.single() ).thenThrow( new ServiceUnavailableException( "oh no" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, accessMode, LOCALHOST, onError ); - - // When - try - { - result.single(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingTransactionTest.java deleted file mode 100644 index 1118cc7f21..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingTransactionTest.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.Map; - -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.spi.Collector; -import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.v1.AccessMode; -import org.neo4j.driver.v1.Transaction; -import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; -import org.neo4j.driver.v1.exceptions.SessionExpiredException; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -public class RoutingTransactionTest -{ - private static final BoltServerAddress LOCALHOST = new BoltServerAddress( "localhost", 7687 ); - private Connection connection; - private RoutingErrorHandler onError; - private Runnable cleanup; - - private Answer throwingAnswer( final Throwable throwable ) - { - return new Answer() - { - @Override - public Void answer( InvocationOnMock invocationOnMock ) throws Throwable - { - String statement = (String) invocationOnMock.getArguments()[0]; - if ( statement.equals( "BEGIN" ) ) - { - return null; - } - else - { - throw throwable; - } - } - }; - } - - @Before - public void setUp() - { - connection = mock( Connection.class ); - when( connection.boltServerAddress() ).thenReturn( LOCALHOST ); - when( connection.isOpen() ).thenReturn( true ); - onError = mock( RoutingErrorHandler.class ); - cleanup = mock( Runnable.class ); - } - - @SuppressWarnings( "unchecked" ) - @Test - public void shouldHandleConnectionFailures() - { - // Given - - doAnswer( throwingAnswer( new ServiceUnavailableException( "oh no" ) ) ) - .when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); - - RoutingTransaction tx = - new RoutingTransaction( new ExplicitTransaction( connection, cleanup ), AccessMode.READ, LOCALHOST, - onError ); - - // When - try - { - tx.run( "CREATE ()" ); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @SuppressWarnings( "unchecked" ) - @Test - public void shouldHandleWriteFailuresInWriteAccessMode() - { - // Given - doAnswer( throwingAnswer( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ) ) - .when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); - - RoutingTransaction tx = - new RoutingTransaction( new ExplicitTransaction( connection, cleanup ), AccessMode.WRITE, - connection.boltServerAddress(), onError ); - - // When - try - { - tx.run( "CREATE ()" ); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @SuppressWarnings( "unchecked" ) - @Test - public void shouldHandleWriteFailuresInReadAccessMode() - { - // Given - doAnswer( throwingAnswer( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ) ) - .when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); - RoutingTransaction tx = - new RoutingTransaction( new ExplicitTransaction( connection, cleanup ), AccessMode.READ, - connection.boltServerAddress(), onError ); - - // When - try - { - tx.run( "CREATE ()" ); - fail(); - } - catch ( ClientException e ) - { - //ignore - } - verifyNoMoreInteractions( onError ); - } - - @SuppressWarnings( "unchecked" ) - @Test - public void shouldRethrowNonWriteFailures() - { - // Given - ClientException toBeThrown = new ClientException( "code", "oh no!" ); - doAnswer( throwingAnswer( toBeThrown ) ) - .when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); - RoutingTransaction tx = - new RoutingTransaction( new ExplicitTransaction( connection, cleanup ), AccessMode.WRITE, - connection.boltServerAddress(), onError ); - - // When - try - { - tx.run( "CREATE ()" ); - fail(); - } - catch ( ClientException e ) - { - assertThat( e, is( toBeThrown ) ); - } - - // Then - verifyZeroInteractions( onError ); - } - - @Test - public void shouldHandleConnectionFailuresOnClose() - { - // Given - doThrow( new ServiceUnavailableException( "oh no" ) ). - when( connection ).sync(); - - RoutingTransaction tx = - new RoutingTransaction( new ExplicitTransaction( connection, cleanup ), AccessMode.WRITE, - connection.boltServerAddress(), onError ); - - // When - try - { - tx.close(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onConnectionFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailuresOnClose() - { - // Given - doThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ).when( connection ).sync(); - - RoutingTransaction tx = - new RoutingTransaction( new ExplicitTransaction( connection, cleanup ), AccessMode.WRITE, - connection.boltServerAddress(), onError ); - - // When - try - { - tx.close(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - - @Test - public void shouldDelegateSuccess() - { - // Given - Transaction inner = mock( Transaction.class ); - RoutingTransaction tx = - new RoutingTransaction(inner, AccessMode.WRITE, - connection.boltServerAddress(), onError ); - - // When - tx.success(); - - // Then - verify( inner ).success(); - } - - @Test - public void shouldDelegateFailure() - { - // Given - Transaction inner = mock( Transaction.class ); - RoutingTransaction tx = - new RoutingTransaction(inner, AccessMode.WRITE, - connection.boltServerAddress(), onError ); - - // When - tx.failure(); - - // Then - verify( inner ).failure(); - } - - @Test - public void shouldDelegateIsOpen() - { - // Given - Transaction inner = mock( Transaction.class ); - RoutingTransaction tx = - new RoutingTransaction(inner, AccessMode.WRITE, - connection.boltServerAddress(), onError ); - - // When - tx.isOpen(); - - // Then - verify( inner ).isOpen(); - } - - @Test - public void shouldDelegateTypesystem() - { - // Given - Transaction inner = mock( Transaction.class ); - RoutingTransaction tx = - new RoutingTransaction(inner, AccessMode.WRITE, - connection.boltServerAddress(), onError ); - - // When - tx.typeSystem(); - - // Then - verify( inner ).typeSystem(); - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingWriteStatementResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingWriteStatementResultTest.java deleted file mode 100644 index 251a4334dc..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingWriteStatementResultTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - -import org.junit.Test; - -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.v1.StatementResult; -import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.exceptions.SessionExpiredException; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; -import static org.neo4j.driver.v1.AccessMode.WRITE; - -public class RoutingWriteStatementResultTest -{ - private static final BoltServerAddress LOCALHOST = new BoltServerAddress( "localhost", 7687 ); - private StatementResult delegate = mock( StatementResult.class ); - private RoutingErrorHandler onError = mock( RoutingErrorHandler.class ); - - @Test - public void shouldHandleWriteFailureOnConsume() - { - // Given - when( delegate.consume() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, WRITE, LOCALHOST, onError ); - - // When - try - { - result.consume(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnHasNext() - { - // Given - when( delegate.hasNext() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, WRITE, LOCALHOST, onError ); - - // When - try - { - result.hasNext(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnKeys() - { - // Given - when( delegate.keys() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, WRITE, LOCALHOST, onError ); - - // When - try - { - result.keys(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnList() - { - // Given - when( delegate.list() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, WRITE, LOCALHOST, onError ); - - // When - try - { - result.list(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnNext() - { - // Given - when( delegate.next() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, WRITE, LOCALHOST, onError ); - - // When - try - { - result.next(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnPeek() - { - // Given - when( delegate.peek() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, WRITE, LOCALHOST, onError ); - - // When - try - { - result.peek(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } - - @Test - public void shouldHandleWriteFailureOnSingle() - { - // Given - when( delegate.single() ) - .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); - RoutingStatementResult result = - new RoutingStatementResult( delegate, WRITE, LOCALHOST, onError ); - - // When - try - { - result.single(); - fail(); - } - catch ( SessionExpiredException e ) - { - //ignore - } - - // Then - verify( onError ).onWriteFailure( LOCALHOST ); - verifyNoMoreInteractions( onError ); - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java index ab000c29a4..55af73bd37 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java @@ -27,8 +27,8 @@ import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.net.pooling.PooledConnection; import org.neo4j.driver.internal.spi.Collector; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.value.StringValue; import org.neo4j.driver.v1.Record; diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index 66b1da4d95..342cb37e19 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -25,14 +25,18 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import org.neo4j.driver.internal.RoutingTransaction; +import org.neo4j.driver.internal.ExplicitTransaction; +import org.neo4j.driver.internal.NetworkSession; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import org.neo4j.driver.v1.util.Resource; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; @@ -40,6 +44,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -96,32 +101,75 @@ public void ensureRouting() public void shouldEnsureRoutingWhenAcquireConn() throws Exception { // given - Connection writerConn = mock( Connection.class ); - Connection readConn = mock( Connection.class ); + PooledConnection writerConn = mock( PooledConnection.class ); + PooledConnection readConn = mock( PooledConnection.class ); LoadBalancer balancer = setupLoadBalancer( writerConn, readConn ); LoadBalancer spy = spy( balancer ); // when Connection connection = spy.acquireReadConnection(); + connection.init( "Test", Collections.emptyMap() ); // then verify( spy ).ensureRouting(); - assertThat( connection, equalTo( readConn ) ); + verify( readConn ).init( "Test", Collections.emptyMap() ); } @Test public void shouldAcquireReaderOrWriterConn() throws Exception { - Connection writerConn = mock( Connection.class ); - Connection readConn = mock( Connection.class ); + PooledConnection writerConn = mock( PooledConnection.class ); + PooledConnection readConn = mock( PooledConnection.class ); LoadBalancer balancer = setupLoadBalancer( writerConn, readConn ); - // when & then - assertThat( balancer.acquireReadConnection(), equalTo( readConn ) ); - assertThat( balancer.acquireWriteConnection(), equalTo( writerConn ) ); + Connection acquiredReadConn = balancer.acquireReadConnection(); + acquiredReadConn.init( "TestRead", Collections.emptyMap() ); + verify( readConn ).init( "TestRead", Collections.emptyMap() ); + + Connection acquiredWriteConn = balancer.acquireWriteConnection(); + acquiredWriteConn.init( "TestWrite", Collections.emptyMap() ); + verify( writerConn ).init( "TestWrite", Collections.emptyMap() ); + } + + @Test + public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosingTx() + { + RoutingTable routingTable = mock( RoutingTable.class ); + ConnectionPool connectionPool = mock( ConnectionPool.class ); + Rediscovery rediscovery = mock( Rediscovery.class ); + LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER ); + BoltServerAddress address = new BoltServerAddress( "host", 42 ); + + PooledConnection connection = newConnectionWithFailingSync( address ); + Connection routingConnection = new RoutingPooledConnection( connection, loadBalancer, AccessMode.WRITE ); + Transaction tx = new ExplicitTransaction( routingConnection, mock( Runnable.class ) ); + + assertThrowsSessionExpiredException( tx ); + + verify( routingTable ).forget( address ); + verify( connectionPool ).purge( address ); + } + + @Test + public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosingSession() + { + RoutingTable routingTable = mock( RoutingTable.class ); + ConnectionPool connectionPool = mock( ConnectionPool.class ); + Rediscovery rediscovery = mock( Rediscovery.class ); + LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER ); + BoltServerAddress address = new BoltServerAddress( "host", 42 ); + + PooledConnection connection = newConnectionWithFailingSync( address ); + PooledConnection routingConnection = new RoutingPooledConnection( connection, loadBalancer, AccessMode.WRITE ); + NetworkSession session = new NetworkSession( routingConnection ); + + assertThrowsSessionExpiredException( session ); + + verify( routingTable ).forget( address ); + verify( connectionPool ).purge( address ); } - private LoadBalancer setupLoadBalancer( Connection writerConn, Connection readConn ) + private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConnection readConn ) { BoltServerAddress writer = mock( BoltServerAddress.class ); BoltServerAddress reader = mock( BoltServerAddress.class ); @@ -143,24 +191,21 @@ private LoadBalancer setupLoadBalancer( Connection writerConn, Connection readCo return new LoadBalancer( routingTable, connPool, mock( Rediscovery.class ), DEV_NULL_LOGGER ); } - @Test - public void shouldForgetAddressAndItsConnectionsOnServiceUnavailable() + private static PooledConnection newConnectionWithFailingSync( BoltServerAddress address ) { - Transaction tx = mock( Transaction.class ); - RoutingTable routingTable = mock( RoutingTable.class ); - ConnectionPool connectionPool = mock( ConnectionPool.class ); - Rediscovery rediscovery = mock( Rediscovery.class ); - LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER ); - BoltServerAddress address = new BoltServerAddress( "host", 42 ); - - RoutingTransaction routingTx = new RoutingTransaction( tx, AccessMode.WRITE, address, loadBalancer ); - - ServiceUnavailableException txCloseError = new ServiceUnavailableException( "Oh!" ); - doThrow( txCloseError ).when( tx ).close(); + PooledConnection connection = mock( PooledConnection.class ); + doReturn( true ).when( connection ).isOpen(); + doReturn( address ).when( connection ).boltServerAddress(); + ServiceUnavailableException closeError = new ServiceUnavailableException( "Oh!" ); + doThrow( closeError ).when( connection ).sync(); + return connection; + } + private static void assertThrowsSessionExpiredException( Resource resource ) + { try { - routingTx.close(); + resource.close(); fail( "Exception expected" ); } catch ( Exception e ) @@ -168,8 +213,5 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailable() assertThat( e, instanceOf( SessionExpiredException.class ) ); assertThat( e.getCause(), instanceOf( ServiceUnavailableException.class ) ); } - - verify( routingTable ).forget( address ); - verify( connectionPool ).purge( address ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java index e986d3ec08..b62e1c6f1f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java @@ -30,9 +30,9 @@ import java.util.List; import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.net.pooling.PooledConnection; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.exceptions.ProtocolException; diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java new file mode 100644 index 0000000000..71c602a5b7 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java @@ -0,0 +1,413 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.net.pooling.PoolSettings; +import org.neo4j.driver.internal.net.pooling.SocketConnectionPool; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.Connector; +import org.neo4j.driver.internal.spi.PooledConnection; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.net.pooling.PoolSettings.DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE; +import static org.neo4j.driver.internal.net.pooling.PoolSettings.NO_IDLE_CONNECTION_TEST; +import static org.neo4j.driver.internal.spi.Collector.NO_OP; +import static org.neo4j.driver.internal.util.Matchers.containsReader; +import static org.neo4j.driver.internal.util.Matchers.containsRouter; +import static org.neo4j.driver.internal.util.Matchers.containsWriter; +import static org.neo4j.driver.v1.Values.value; + +@RunWith( Parameterized.class ) +public class RoutingPooledConnectionErrorHandlingTest +{ + private static final BoltServerAddress ADDRESS1 = new BoltServerAddress( "server-1", 26000 ); + private static final BoltServerAddress ADDRESS2 = new BoltServerAddress( "server-2", 27000 ); + private static final BoltServerAddress ADDRESS3 = new BoltServerAddress( "server-3", 28000 ); + + @Parameter + public ConnectionMethod method; + + @Parameters( name = "{0}" ) + public static List methods() + { + return asList( + new Init(), + new Run(), + new DiscardAll(), + new PullAll(), + new Reset(), + new ResetAsync(), + new AckFailure(), + new Sync(), + new Flush(), + new ReceiveOne() ); + } + + @Test + public void shouldHandleServiceUnavailableException() + { + ServiceUnavailableException serviceUnavailable = new ServiceUnavailableException( "Oh!" ); + Connector connector = newConnectorWithThrowingConnections( serviceUnavailable ); + RoutingTable routingTable = newRoutingTable( ADDRESS1, ADDRESS2, ADDRESS3 ); + ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); + LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); + + Connection readConnection = loadBalancer.acquireReadConnection(); + verifyServiceUnavailableHandling( readConnection, routingTable, connectionPool ); + + Connection writeConnection = loadBalancer.acquireWriteConnection(); + verifyServiceUnavailableHandling( writeConnection, routingTable, connectionPool ); + + assertThat( routingTable, containsRouter( ADDRESS3 ) ); + assertTrue( connectionPool.hasAddress( ADDRESS3 ) ); + } + + @Test + public void shouldHandleFailureToWriteWithWriteConnection() + { + testHandleFailureToWriteWithWriteConnection( new ClientException( "Neo.ClientError.Cluster.NotALeader", "" ) ); + testHandleFailureToWriteWithWriteConnection( + new ClientException( "Neo.ClientError.General.ForbiddenOnReadOnlyDatabase", "" ) ); + } + + @Test + public void shouldHandleFailureToWrite() + { + testHandleFailureToWrite( new ClientException( "Neo.ClientError.Cluster.NotALeader", "" ) ); + testHandleFailureToWrite( new ClientException( "Neo.ClientError.General.ForbiddenOnReadOnlyDatabase", "" ) ); + } + + @Test + public void shouldPropagateThrowable() + { + testThrowablePropagation( new RuntimeException( "Random error" ) ); + } + + @Test + public void shouldPropagateClientExceptionWithoutErrorCode() + { + testThrowablePropagation( new ClientException( null, "Message" ) ); + } + + private void testHandleFailureToWriteWithWriteConnection( ClientException error ) + { + Connector connector = newConnectorWithThrowingConnections( error ); + RoutingTable routingTable = newRoutingTable( ADDRESS1, ADDRESS2, ADDRESS3 ); + ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); + LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); + + Connection readConnection = loadBalancer.acquireReadConnection(); + try + { + method.invoke( readConnection ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ClientException.class ) ); + + BoltServerAddress address = readConnection.boltServerAddress(); + assertThat( routingTable, containsRouter( address ) ); + assertThat( routingTable, containsReader( address ) ); + assertThat( routingTable, containsWriter( address ) ); + assertTrue( connectionPool.hasAddress( address ) ); + } + + assertThat( routingTable, containsRouter( ADDRESS3 ) ); + assertTrue( connectionPool.hasAddress( ADDRESS3 ) ); + } + + private void testHandleFailureToWrite( ClientException error ) + { + Connector connector = newConnectorWithThrowingConnections( error ); + RoutingTable routingTable = newRoutingTable( ADDRESS1, ADDRESS2, ADDRESS3 ); + ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); + LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); + + Connection readConnection = loadBalancer.acquireWriteConnection(); + try + { + method.invoke( readConnection ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( SessionExpiredException.class ) ); + + BoltServerAddress address = readConnection.boltServerAddress(); + assertThat( routingTable, containsRouter( address ) ); + assertThat( routingTable, containsReader( address ) ); + assertThat( routingTable, not( containsWriter( address ) ) ); + assertTrue( connectionPool.hasAddress( address ) ); + } + + assertThat( routingTable, containsRouter( ADDRESS3 ) ); + assertTrue( connectionPool.hasAddress( ADDRESS3 ) ); + } + + private void testThrowablePropagation( Throwable error ) + { + Connector connector = newConnectorWithThrowingConnections( error ); + RoutingTable routingTable = newRoutingTable( ADDRESS1, ADDRESS2, ADDRESS3 ); + ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); + LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); + + Connection readConnection = loadBalancer.acquireReadConnection(); + verifyThrowablePropagation( readConnection, routingTable, connectionPool, error.getClass() ); + + Connection writeConnection = loadBalancer.acquireWriteConnection(); + verifyThrowablePropagation( writeConnection, routingTable, connectionPool, error.getClass() ); + + assertThat( routingTable, containsRouter( ADDRESS3 ) ); + assertTrue( connectionPool.hasAddress( ADDRESS3 ) ); + } + + private void verifyServiceUnavailableHandling( Connection connection, RoutingTable routingTable, + ConnectionPool connectionPool ) + { + try + { + method.invoke( connection ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( SessionExpiredException.class ) ); + assertThat( e.getCause(), instanceOf( ServiceUnavailableException.class ) ); + + BoltServerAddress address = connection.boltServerAddress(); + assertThat( routingTable, containsRouter( address ) ); + assertThat( routingTable, not( containsReader( address ) ) ); + assertThat( routingTable, not( containsWriter( address ) ) ); + assertFalse( connectionPool.hasAddress( address ) ); + } + } + + private void verifyThrowablePropagation( Connection connection, RoutingTable routingTable, + ConnectionPool connectionPool, Class expectedClass ) + { + try + { + method.invoke( connection ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( expectedClass ) ); + + BoltServerAddress address = connection.boltServerAddress(); + assertThat( routingTable, containsRouter( address ) ); + assertThat( routingTable, containsReader( address ) ); + assertThat( routingTable, containsWriter( address ) ); + assertTrue( connectionPool.hasAddress( address ) ); + } + } + + private Connector newConnectorWithThrowingConnections( final Throwable error ) + { + Connector connector = mock( Connector.class ); + when( connector.connect( any( BoltServerAddress.class ) ) ).thenAnswer( new Answer() + { + @Override + public Connection answer( InvocationOnMock invocation ) throws Throwable + { + BoltServerAddress address = invocation.getArgumentAt( 0, BoltServerAddress.class ); + Connection connection = newConnectionMock( address ); + method.invoke( doThrow( error ).doNothing().when( connection ) ); + return connection; + } + } ); + return connector; + } + + private static Connection newConnectionMock( BoltServerAddress address ) + { + Connection connection = mock( Connection.class ); + when( connection.boltServerAddress() ).thenReturn( address ); + return connection; + } + + private static RoutingTable newRoutingTable( BoltServerAddress... addresses ) + { + ClusterComposition clusterComposition = new ClusterComposition( + Long.MAX_VALUE, + new HashSet<>( asList( addresses ) ), + new HashSet<>( asList( addresses ) ), + new HashSet<>( asList( addresses ) ) ); + + RoutingTable routingTable = new ClusterRoutingTable( Clock.SYSTEM ); + routingTable.update( clusterComposition ); + + return routingTable; + } + + private static ConnectionPool newConnectionPool( Connector connector, BoltServerAddress... addresses ) + { + int maxIdleConnections = DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE; + PoolSettings settings = new PoolSettings( maxIdleConnections, NO_IDLE_CONNECTION_TEST ); + SocketConnectionPool pool = new SocketConnectionPool( settings, connector, Clock.SYSTEM, DEV_NULL_LOGGING ); + + // force pool to create and memorize some connections + for ( BoltServerAddress address : addresses ) + { + List connections = new ArrayList<>(); + for ( int i = 0; i < maxIdleConnections; i++ ) + { + connections.add( pool.acquire( address ) ); + } + for ( PooledConnection connection : connections ) + { + connection.close(); + } + } + + return pool; + } + + private static LoadBalancer newLoadBalancer( RoutingTable routingTable, ConnectionPool connectionPool ) + { + return new LoadBalancer( routingTable, connectionPool, mock( Rediscovery.class ), DEV_NULL_LOGGER ); + } + + private interface ConnectionMethod + { + void invoke( Connection connection ); + } + + private static class Init implements ConnectionMethod + { + @Override + public void invoke( Connection connection ) + { + connection.init( "JavaDriver", singletonMap( "Key", value( "Value" ) ) ); + } + } + + private static class Run implements ConnectionMethod + { + @Override + public void invoke( Connection connection ) + { + connection.run( "CREATE (n:Node {name: {value}})", singletonMap( "value", value( "A" ) ), NO_OP ); + } + } + + private static class DiscardAll implements ConnectionMethod + { + @Override + public void invoke( Connection connection ) + { + connection.discardAll( NO_OP ); + } + } + + private static class PullAll implements ConnectionMethod + { + @Override + public void invoke( Connection connection ) + { + connection.pullAll( NO_OP ); + } + } + + private static class Reset implements ConnectionMethod + { + @Override + public void invoke( Connection connection ) + { + connection.reset(); + } + } + + private static class ResetAsync implements ConnectionMethod + { + @Override + public void invoke( Connection connection ) + { + connection.resetAsync(); + } + } + + private static class AckFailure implements ConnectionMethod + { + @Override + public void invoke( Connection connection ) + { + connection.ackFailure(); + } + } + + private static class Sync implements ConnectionMethod + { + @Override + public void invoke( Connection connection ) + { + connection.sync(); + } + } + + private static class Flush implements ConnectionMethod + { + @Override + public void invoke( Connection connection ) + { + connection.flush(); + } + } + + private static class ReceiveOne implements ConnectionMethod + { + @Override + public void invoke( Connection connection ) + { + connection.receiveOne(); + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java index 625fbc0cee..67d6fad4bd 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java @@ -22,6 +22,7 @@ import org.junit.Test; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.Logger; @@ -159,9 +160,9 @@ public void shouldTryToCloseAllUnderlyingConnections() doThrow( closeError2 ).when( connection2 ).close(); doThrow( closeError3 ).when( connection3 ).close(); - PooledConnection pooledConnection1 = new PooledConnection( connection1, mock( Consumer.class ), SYSTEM ); - PooledConnection pooledConnection2 = new PooledConnection( connection2, mock( Consumer.class ), SYSTEM ); - PooledConnection pooledConnection3 = new PooledConnection( connection3, mock( Consumer.class ), SYSTEM ); + PooledConnection pooledConnection1 = new PooledSocketConnection( connection1, mock( Consumer.class ), SYSTEM ); + PooledConnection pooledConnection2 = new PooledSocketConnection( connection2, mock( Consumer.class ), SYSTEM ); + PooledConnection pooledConnection3 = new PooledSocketConnection( connection3, mock( Consumer.class ), SYSTEM ); queue.offer( pooledConnection1 ); queue.offer( pooledConnection2 ); @@ -187,7 +188,7 @@ public void shouldLogWhenConnectionDisposeFails() Connection connection = mock( Connection.class ); RuntimeException closeError = new RuntimeException( "Fail" ); doThrow( closeError ).when( connection ).close(); - PooledConnection pooledConnection = new PooledConnection( connection, mock( Consumer.class ), SYSTEM ); + PooledConnection pooledConnection = new PooledSocketConnection( connection, mock( Consumer.class ), SYSTEM ); queue.offer( pooledConnection ); queue.terminate(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java index ef81dac696..d3ea6c4e72 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java @@ -28,6 +28,7 @@ import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumers; import org.neo4j.driver.v1.Value; @@ -57,7 +58,7 @@ public class ConnectionInvalidationTest private final Clock clock = mock( Clock.class ); private final PooledConnection conn = - new PooledConnection( delegate, Consumers.noOp(), Clock.SYSTEM ); + new PooledSocketConnection( delegate, Consumers.noOp(), Clock.SYSTEM ); @SuppressWarnings( "unchecked" ) @Test @@ -66,7 +67,7 @@ public void shouldNotInvalidateConnectionThatIsUnableToRun() throws Throwable // Given a connection that's broken Mockito.doThrow( new ClientException( "That didn't work" ) ) .when( delegate ).run( anyString(), anyMap(), any( Collector.class ) ); - PooledConnection conn = new PooledConnection( delegate, Consumers.noOp(), clock ); + PooledConnection conn = new PooledSocketConnection( delegate, Consumers.noOp(), clock ); PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ) ); // When/Then @@ -98,7 +99,7 @@ public void shouldInvalidConnectionIfFailedToReset() throws Throwable { // Given a connection that's broken Mockito.doThrow( new ClientException( "That didn't work" ) ).when( delegate ).reset(); - PooledConnection conn = new PooledConnection( delegate, Consumers.noOp(), clock ); + PooledConnection conn = new PooledSocketConnection( delegate, Consumers.noOp(), clock ); PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ) ); // When/Then BlockingPooledConnectionQueue diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidatorTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidatorTest.java index 1c336a7eb6..d7f209cc1d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidatorTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidatorTest.java @@ -33,6 +33,7 @@ import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.summary.InternalServerInfo; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumers; @@ -178,7 +179,7 @@ public void isConnectedReturnsTrueWhenUnderlyingConnectionWorks() private static PooledConnection newPooledConnection( Connection connection ) { - return new PooledConnection( connection, Consumers.noOp(), Clock.SYSTEM ); + return new PooledSocketConnection( connection, Consumers.noOp(), Clock.SYSTEM ); } private static ConnectionPool connectionPoolMock( boolean knowsAddressed ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnectionTest.java similarity index 91% rename from driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java rename to driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnectionTest.java index 9800b25e0c..51841068fc 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnectionTest.java @@ -22,6 +22,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionValidator; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.Logging; @@ -41,7 +42,7 @@ import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; -public class PooledConnectionTest +public class PooledSocketConnectionTest { private static final ConnectionValidator VALID_CONNECTION = newFixedValidator( true, true ); @@ -59,7 +60,7 @@ public void shouldDisposeConnectionIfNotValidConnection() throws Throwable PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, INVALID_CONNECTION ); - PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) + PooledConnection pooledConnection = new PooledSocketConnection( conn, releaseConsumer, Clock.SYSTEM ) { @Override public void dispose() @@ -87,7 +88,7 @@ public void shouldReturnToThePoolIfIsValidConnectionAndIdlePoolIsNotFull() throw Connection conn = mock( Connection.class ); PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION ); - PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) + PooledConnection pooledConnection = new PooledSocketConnection( conn, releaseConsumer, Clock.SYSTEM ) { @Override public void dispose() @@ -117,8 +118,8 @@ public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws T Connection conn = mock( Connection.class ); PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION); - PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ); - PooledConnection shouldBeClosedConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) + PooledConnection pooledConnection = new PooledSocketConnection( conn, releaseConsumer, Clock.SYSTEM ); + PooledConnection shouldBeClosedConnection = new PooledSocketConnection( conn, releaseConsumer, Clock.SYSTEM ) { @Override public void dispose() @@ -204,7 +205,7 @@ public void shouldDisposeConnectionIfPoolAlreadyClosed() throws Throwable Connection conn = mock( Connection.class ); PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION); - PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) + PooledConnection pooledConnection = new PooledSocketConnection( conn, releaseConsumer, Clock.SYSTEM ) { @Override public void dispose() @@ -233,7 +234,7 @@ public void shouldDisposeConnectionIfPoolStoppedAfterPuttingConnectionBackToPool PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION); - PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) + PooledConnection pooledConnection = new PooledSocketConnection( conn, releaseConsumer, Clock.SYSTEM ) { @Override public void dispose() @@ -257,7 +258,7 @@ public void shouldAckFailureOnRecoverableFailure() throws Throwable Connection conn = mock( Connection.class ); ClientException error = new ClientException( "Neo.ClientError", "a recoverable error" ); doThrow( error ).when( conn ).sync(); - PooledConnection pooledConnection = new PooledConnection( + PooledConnection pooledConnection = new PooledSocketConnection( conn, mock( PooledConnectionReleaseConsumer.class ), mock( Clock.class ) ); @@ -284,7 +285,7 @@ public void shouldNotAckFailureOnUnRecoverableFailure() Connection conn = mock( Connection.class ); ClientException error = new ClientException( "an unrecoverable error" ); doThrow( error ).when( conn ).sync(); - PooledConnection pooledConnection = new PooledConnection( + PooledConnection pooledConnection = new PooledSocketConnection( conn, mock( PooledConnectionReleaseConsumer.class ), mock( Clock.class ) ); @@ -315,7 +316,7 @@ public void shouldThrowExceptionIfFailureReceivedForAckFailure() "Invalid server response message `FAILURE` received for client message `ACK_FAILURE`." ); doThrow( error ).doThrow( failedToAckFailError ).when( conn ).sync(); - PooledConnection pooledConnection = new PooledConnection( + PooledConnection pooledConnection = new PooledSocketConnection( conn, mock( PooledConnectionReleaseConsumer.class ), mock( Clock.class ) ); @@ -353,7 +354,7 @@ public void hasNewLastUsedTimestampWhenCreated() PooledConnectionReleaseConsumer releaseConsumer = mock( PooledConnectionReleaseConsumer.class ); Clock clock = when( mock( Clock.class ).millis() ).thenReturn( 42L ).getMock(); - PooledConnection connection = new PooledConnection( mock( Connection.class ), releaseConsumer, clock ); + PooledConnection connection = new PooledSocketConnection( mock( Connection.class ), releaseConsumer, clock ); assertEquals( 42L, connection.lastUsedTimestamp() ); } @@ -365,7 +366,7 @@ public void lastUsedTimestampUpdatedWhenConnectionClosed() Clock clock = when( mock( Clock.class ).millis() ) .thenReturn( 42L ).thenReturn( 4242L ).thenReturn( 424242L ).getMock(); - PooledConnection connection = new PooledConnection( mock( Connection.class ), releaseConsumer, clock ); + PooledConnection connection = new PooledSocketConnection( mock( Connection.class ), releaseConsumer, clock ); assertEquals( 42, connection.lastUsedTimestamp() ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java index 5b737c4f7a..d761981469 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java @@ -38,6 +38,7 @@ import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.Connector; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.Logging; diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java new file mode 100644 index 0000000000..ac0b1ffbb7 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +import org.neo4j.driver.internal.cluster.RoundRobinAddressSet; +import org.neo4j.driver.internal.cluster.RoutingTable; +import org.neo4j.driver.internal.net.BoltServerAddress; + +public final class Matchers +{ + private Matchers() + { + } + + public static Matcher containsRouter( final BoltServerAddress address ) + { + return new TypeSafeMatcher() + { + @Override + protected boolean matchesSafely( RoutingTable routingTable ) + { + for ( int i = 0; i < routingTable.routerSize(); i++ ) + { + if ( routingTable.nextRouter().equals( address ) ) + { + return true; + } + } + return false; + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "routing table that contains router " ).appendValue( address ); + } + }; + } + + public static Matcher containsReader( final BoltServerAddress address ) + { + return new TypeSafeMatcher() + { + @Override + protected boolean matchesSafely( RoutingTable routingTable ) + { + return contains( routingTable.readers(), address ); + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "routing table that contains reader " ).appendValue( address ); + } + }; + } + + public static Matcher containsWriter( final BoltServerAddress address ) + { + return new TypeSafeMatcher() + { + @Override + protected boolean matchesSafely( RoutingTable routingTable ) + { + return contains( routingTable.writers(), address ); + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "routing table that contains writer " ).appendValue( address ); + } + }; + } + + private static boolean contains( RoundRobinAddressSet set, BoltServerAddress address ) + { + for ( int i = 0; i < set.size(); i++ ) + { + if ( set.next().equals( address ) ) + { + return true; + } + } + return false; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/DriverCloseIT.java b/driver/src/test/java/org/neo4j/driver/v1/DriverCloseIT.java index 91e565a871..cdf4e5239c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/DriverCloseIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/DriverCloseIT.java @@ -195,7 +195,7 @@ public void useSessionAfterDriverIsClosed() throws Exception } catch ( Exception e ) { - assertThat( e.getCause(), instanceOf( ServiceUnavailableException.class ) ); + assertThat( e, instanceOf( ServiceUnavailableException.class ) ); } assertEquals( 0, readServer.exitStatus() );