Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected void finalize() throws Throwable

private void logLeakIfNeeded()
{
Boolean isOpen = Futures.getBlocking( currentConnectionIsOpen() );
Boolean isOpen = Futures.getBlocking( hasOpenConnection() );
if ( isOpen )
{
logger.error( "Neo4j Session object leaked, please ensure that your application" +
Expand Down
106 changes: 72 additions & 34 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public class NetworkSession implements Session

private volatile Bookmark bookmark = Bookmark.empty();
private volatile CompletionStage<ExplicitTransaction> transactionStage = completedFuture( null );
private volatile CompletionStage<Connection> connectionStage = completedFuture( null );
private volatile CompletionStage<Connection> readConnectionStage = completedFuture( null );
private volatile CompletionStage<Connection> writeConnectionStage = completedFuture( null );

private final AtomicBoolean open = new AtomicBoolean( true );

Expand Down Expand Up @@ -250,14 +251,13 @@ public void reset()

private CompletionStage<Void> resetAsync()
{
return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() )
.thenAccept( tx ->
{
if ( tx != null )
{
tx.markTerminated();
}
} );
return existingTransactionOrNull().thenAccept( tx ->
{
if ( tx != null )
{
tx.markTerminated();
}
} ).thenCompose( ignore -> releaseConnections() );
}

@Override
Expand All @@ -266,12 +266,14 @@ public TypeSystem typeSystem()
return InternalTypeSystem.TYPE_SYSTEM;
}

CompletionStage<Boolean> currentConnectionIsOpen()
CompletionStage<Boolean> hasOpenConnection()
{
return hasOpenConnection( readConnectionStage ).thenCompose( readConnectionOpen ->
readConnectionOpen ? completedFuture( true ) : hasOpenConnection( writeConnectionStage ) );
}

private CompletionStage<Boolean> hasOpenConnection( CompletionStage<Connection> connectionStage )
{
if ( connectionStage == null )
{
return completedFuture( false );
}
return connectionStage.handle( ( connection, error ) ->
error == null && // no acquisition error
connection != null && // some connection has actually been acquired
Expand Down Expand Up @@ -447,18 +449,36 @@ private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode m

private CompletionStage<Connection> acquireConnection( AccessMode mode )
{
// memorize in local so same instance is transformed and used in callbacks
CompletionStage<Connection> currentAsyncConnectionStage = connectionStage;
if ( mode == AccessMode.READ )
{
CompletionStage<Connection> currentConnectionStage = readConnectionStage;
readConnectionStage = acquireConnection( currentConnectionStage, mode );
return readConnectionStage;
}
else if ( mode == AccessMode.WRITE )
{
CompletionStage<Connection> currentConnectionStage = writeConnectionStage;
writeConnectionStage = acquireConnection( currentConnectionStage, mode );
return writeConnectionStage;
}
else
{
throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" );
}
}

connectionStage = currentAsyncConnectionStage
private CompletionStage<Connection> acquireConnection( CompletionStage<Connection> currentConnectionStage,
AccessMode mode )
{
return currentConnectionStage
.exceptionally( error -> null ) // handle previous acquisition failures
.thenCompose( connection ->
{
if ( connection != null && connection.tryMarkInUse() )
{
// previous acquisition attempt was successful and connection has not been released yet
// continue using same connection
return currentAsyncConnectionStage;
return currentConnectionStage;
}
else
{
Expand All @@ -467,13 +487,11 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
return connectionProvider.acquireConnection( mode );
}
} );

return connectionStage;
}

private CompletionStage<Void> releaseResources()
{
return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() );
return rollbackTransaction().thenCompose( ignore -> releaseConnections() );
}

private CompletionStage<Void> rollbackTransaction()
Expand All @@ -493,16 +511,41 @@ private CompletionStage<Void> rollbackTransaction()
} );
}

private CompletionStage<Void> releaseConnectionNow()
private CompletionStage<Void> releaseConnections()
{
CompletableFuture<Void> result = new CompletableFuture<>();
releaseConnection( readConnectionStage ).whenComplete( ( ignore1, readConnectionError ) ->
releaseConnection( writeConnectionStage ).whenComplete( ( ignore2, writeConnectionError ) ->
afterConnectionsReleased( result, readConnectionError, writeConnectionError ) ) );
return result;
}

private CompletionStage<Void> releaseConnection( CompletionStage<Connection> connectionStage )
{
return existingConnectionOrNull().thenCompose( connection ->
return connectionStage.exceptionally( error -> null )
.thenCompose( connection -> connection == null ? completedFuture( null ) : connection.releaseNow() );
}

private void afterConnectionsReleased( CompletableFuture<Void> result, Throwable readConnectionError,
Throwable writeConnectionError )
{
if ( readConnectionError != null && writeConnectionError != null )
{
if ( connection != null )
{
return connection.releaseNow();
}
return completedFuture( null );
} );
readConnectionError.addSuppressed( writeConnectionError );
result.completeExceptionally( readConnectionError );
}
else if ( readConnectionError != null )
{
result.completeExceptionally( readConnectionError );
}
else if ( writeConnectionError != null )
{
result.completeExceptionally( writeConnectionError );
}
else
{
result.complete( null );
}
}

private CompletionStage<Void> ensureNoOpenTxBeforeRunningQuery()
Expand Down Expand Up @@ -535,11 +578,6 @@ private CompletionStage<ExplicitTransaction> existingTransactionOrNull()
.thenApply( tx -> tx != null && tx.isOpen() ? tx : null );
}

private CompletionStage<Connection> existingConnectionOrNull()
{
return connectionStage.exceptionally( error -> null ); // handle previous acquisition failures
}

private void ensureSessionIsOpen()
{
if ( !open.get() )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.internal.util.Supplier;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Session;
Expand Down Expand Up @@ -74,6 +73,7 @@
import static org.neo4j.driver.v1.AccessMode.READ;
import static org.neo4j.driver.v1.AccessMode.WRITE;
import static org.neo4j.driver.v1.util.TestUtil.connectionMock;
import static org.neo4j.driver.v1.util.TestUtil.setupSuccessfulPullAll;

public class NetworkSessionTest
{
Expand All @@ -88,8 +88,6 @@ public class NetworkSessionTest
public void setUp()
{
connection = connectionMock();
when( connection.releaseNow() ).thenReturn( completedFuture( null ) );
when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 );
connectionProvider = mock( ConnectionProvider.class );
when( connectionProvider.acquireConnection( any( AccessMode.class ) ) )
.thenReturn( completedFuture( connection ) );
Expand Down Expand Up @@ -252,7 +250,7 @@ public void acquiresNewConnectionWhenUnableToUseCurrentOneForRun()
public void releasesOpenConnectionUsedForRunWhenSessionIsClosed()
{
String query = "RETURN 1";
setupSuccessfulPullAll( query );
setupSuccessfulPullAll( connection, query );

session.run( query );

Expand Down Expand Up @@ -357,7 +355,7 @@ public void updatesBookmarkWhenTxIsClosed()
public void releasesConnectionWhenTxIsClosed()
{
String query = "RETURN 42";
setupSuccessfulPullAll( query );
setupSuccessfulPullAll( connection, query );

Transaction tx = session.beginTransaction();
tx.run( query );
Expand Down Expand Up @@ -603,6 +601,25 @@ public void transactionShouldBeRolledBackAfterSessionReset()
assertFalse( tx.isOpen() );
}

@Test
public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset()
{
NetworkSession session = newSession( connectionProvider, READ );
Transaction tx = session.beginTransaction();

assertTrue( tx.isOpen() );
when( connection.releaseNow() ).then( invocation ->
{
// verify that tx is not open when connection is released
assertFalse( tx.isOpen() );
return completedFuture( null );
} );

session.reset();

verify( connection ).releaseNow();
}

@Test
public void shouldHaveNullLastBookmarkInitially()
{
Expand Down Expand Up @@ -755,6 +772,32 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection()
verifyBeginTx( connection, times( 1 ) );
}

@Test
public void shouldNotUseReadConnectionForWriteAccessMode()
{
String readQuery = "RETURN 1";
String writeQuery = "CREATE ()";

Connection readConnection = connectionMock();
Connection writeConnection = connectionMock();
when( readConnection.tryMarkInUse() ).thenReturn( true ); // allow read connection to always be used
when( writeConnection.tryMarkInUse() ).thenReturn( true ); // allow write connection to always be used
setupSuccessfulPullAll( readConnection, readQuery );
setupSuccessfulPullAll( writeConnection, writeQuery );
when( connectionProvider.acquireConnection( READ ) ).thenReturn( completedFuture( readConnection ) );
when( connectionProvider.acquireConnection( WRITE ) ).thenReturn( completedFuture( writeConnection ) );

session.run( readQuery ); // query is executed with default READ access mode
session.writeTransaction( tx -> tx.run( writeQuery ) ); // write tx is executed with WRITE access mode

// auto-commit query and write tx should've used different connections
verifyRunAndFlush( readConnection, writeQuery, never() );
verifyRunAndFlush( writeConnection, readQuery, never() );

verifyRunAndFlush( readConnection, readQuery, times( 1 ) );
verifyRunAndFlush( writeConnection, writeQuery, times( 1 ) );
}

private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode )
{
NetworkSession session = newSession( connectionProvider, sessionMode );
Expand Down Expand Up @@ -1030,16 +1073,6 @@ private static void setupFailingBegin( Connection connection, Throwable error )
} ).when( connection ).runAndFlush( eq( "BEGIN" ), any(), any(), any() );
}

private void setupSuccessfulPullAll( String query )
{
doAnswer( invocation ->
{
ResponseHandler pullAllHandler = invocation.getArgumentAt( 3, ResponseHandler.class );
pullAllHandler.onSuccess( emptyMap() );
return null;
} ).when( connection ).runAndFlush( eq( query ), eq( emptyMap() ), any(), any() );
}

private static class TxWork implements TransactionWork<Integer>
{
final int result;
Expand Down
7 changes: 6 additions & 1 deletion driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;

import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public final class TestUtil
{
Expand Down Expand Up @@ -157,10 +160,12 @@ public static Connection connectionMock()
setupSuccessfulPullAll( connection, "COMMIT" );
setupSuccessfulPullAll( connection, "ROLLBACK" );
setupSuccessfulPullAll( connection, "BEGIN" );
when( connection.releaseNow() ).thenReturn( completedFuture( null ) );
when( connection.serverVersion() ).thenReturn( ServerVersion.vInDev );
return connection;
}

private static void setupSuccessfulPullAll( Connection connection, String statement )
public static void setupSuccessfulPullAll( Connection connection, String statement )
{
doAnswer( invocation ->
{
Expand Down