diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index db61edf2d5..c80da4cb5f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -170,7 +170,7 @@ public StatementResult run( String statementTemplate, Record statementParameters } @Override - public StatementResult run( Statement statement ) + public synchronized StatementResult run( Statement statement ) { ensureNotFailed(); @@ -217,7 +217,7 @@ public TypeSystem typeSystem() return InternalTypeSystem.TYPE_SYSTEM; } - public void markToClose() + public synchronized void markToClose() { state = State.FAILED; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index c56d1a3f7b..fa636ccb5f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -105,9 +105,14 @@ public StatementResult run( Statement statement ) public void reset() { + ensureSessionIsOpen(); ensureNoUnrecoverableError(); ensureConnectionIsOpen(); + if( currentTransaction != null ) + { + currentTransaction.markToClose(); + } connection.resetAsync(); } @@ -202,6 +207,7 @@ public TypeSystem typeSystem() private void ensureConnectionIsValidBeforeRunningSession() { + ensureSessionIsOpen(); ensureNoUnrecoverableError(); ensureNoOpenTransactionBeforeRunningSession(); ensureConnectionIsOpen(); @@ -209,6 +215,7 @@ private void ensureConnectionIsValidBeforeRunningSession() private void ensureConnectionIsValidBeforeOpeningTransaction() { + ensureSessionIsOpen(); ensureNoUnrecoverableError(); ensureNoOpenTransactionBeforeOpeningTransaction(); ensureConnectionIsOpen(); @@ -263,4 +270,16 @@ private void ensureConnectionIsOpen() "Please close this session and retry your statement in another new session." ); } } + + private void ensureSessionIsOpen() + { + if( !isOpen() ) + { + throw new ClientException( + "No more interaction with this session is allowed " + + "as the current session is already closed or marked as closed. " + + "You get this error either because you have a bad reference to a session that has already be closed " + + "or you are trying to reuse a session that you have called `reset` on it." ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java index 5dc460bf22..27d451f17c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java @@ -113,7 +113,7 @@ public void sync() } @Override - public void flush() + public synchronized void flush() { try { @@ -180,7 +180,7 @@ else if ( e instanceof SocketTimeoutException ) } } - private void queueMessage( Message msg, Collector collector ) + private synchronized void queueMessage( Message msg, Collector collector ) { pendingMessages.add( msg ); responseHandler.appendResultCollector( collector ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java index c3eb327114..b2289fdff6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java @@ -18,9 +18,9 @@ */ package org.neo4j.driver.internal.net; -import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.neo4j.driver.internal.messaging.MessageHandler; import org.neo4j.driver.internal.spi.Collector; @@ -39,7 +39,7 @@ public class SocketResponseHandler implements MessageHandler { - private final Queue collectors = new LinkedList<>(); + private final Queue collectors = new ConcurrentLinkedQueue<>(); /** If a failure occurs, the error gets stored here */ private Neo4jException error; diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index c08ecc891f..ecba230bcf 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -21,17 +21,13 @@ import org.junit.Rule; import org.junit.Test; -import org.neo4j.driver.v1.AuthToken; -import org.neo4j.driver.v1.AuthTokens; -import org.neo4j.driver.v1.Driver; -import org.neo4j.driver.v1.GraphDatabase; -import org.neo4j.driver.v1.Session; -import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.*; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.util.TestNeo4j; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -105,7 +101,8 @@ public void shouldKillLongRunningStatement() throws Throwable final int killTimeout = 1; // 1s long startTime = -1, endTime; - try( final Session session = driver.session() ) + final Session session = driver.session(); + try { StatementResult result = session.run( "CALL test.driver.longRunningStatement({seconds})", @@ -123,13 +120,17 @@ public void shouldKillLongRunningStatement() throws Throwable { endTime = System.currentTimeMillis(); assertTrue( startTime > 0 ); - assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill + assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished } catch ( Exception e ) { fail( "Should be a Neo4jException" ); } + finally + { + session.close(); + } } @Test @@ -168,7 +169,7 @@ public void shouldKillLongStreamingResult() throws Throwable assertThat( recordCount, greaterThan(1) ); assertTrue( startTime > 0 ); - assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill + assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished } } @@ -190,9 +191,74 @@ public void run() } finally { - session.reset(); // kill the session after timeout + session.reset(); // reset the session after timeout } } } ).start(); } + + @Test + public void shouldAllowMoreStatementAfterSessionReset() + { + // Given + try( Driver driver = GraphDatabase.driver( neo4j.uri() ); + Session session = driver.session() ) + { + + session.run( "Return 1" ).consume(); + + // When reset the state of this session + session.reset(); + + // Then can run successfully more statements without any error + session.run( "Return 2" ).consume(); + } + } + + @Test + public void shouldAllowMoreTxAfterSessionReset() + { + // Given + try( Driver driver = GraphDatabase.driver( neo4j.uri() ); + Session session = driver.session() ) + { + try( Transaction tx = session.beginTransaction() ) + { + tx.run("Return 1"); + tx.success(); + } + + // When reset the state of this session + session.reset(); + + // Then can run more Tx + try( Transaction tx = session.beginTransaction() ) + { + tx.run("Return 2"); + tx.success(); + } + } + } + + @Test + public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() + { + // Given + try( Driver driver = GraphDatabase.driver( neo4j.uri() ); + Session session = driver.session() ) + { + try( Transaction tx = session.beginTransaction() ) + { + // When reset the state of this session + session.reset(); + // Then + tx.run( "Return 1" ); + fail( "Should not allow tx run as tx is already failed." ); + } + catch( Exception e ) + { + assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) ); + } + } + } }