From 2af94d038b9e12ec9c699335c2330e9c6e9e4833 Mon Sep 17 00:00:00 2001 From: Zhen Date: Mon, 8 Aug 2016 17:20:52 +0200 Subject: [PATCH 1/3] Make sessioin.reset thread-safe Sync on `enqueue` and `flush` to avoid sending `reset` and other messages at the same time. Also changed response handler stream collector queue to be concurrent queue so that adding into the queue (sending) and removing from the queue (receiving) can be done safely concurrently. Keep `ack_fail` muted when `reset` is already send but not replied yet. --- .../neo4j/driver/internal/NetworkSession.java | 20 +++++++++++++++++-- .../driver/internal/net/SocketConnection.java | 4 ++-- .../internal/net/SocketResponseHandler.java | 4 ++-- .../driver/v1/integration/SessionIT.java | 14 +++++++++---- 4 files changed, 32 insertions(+), 10 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index c56d1a3f7b..1a6cfee89b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -59,6 +59,7 @@ public void run() private ExplicitTransaction currentTransaction; private AtomicBoolean isOpen = new AtomicBoolean( true ); + private AtomicBoolean markedToClose = new AtomicBoolean( false ); NetworkSession( Connection connection, Logger logger ) { @@ -108,13 +109,16 @@ public void reset() ensureNoUnrecoverableError(); ensureConnectionIsOpen(); - connection.resetAsync(); + if( markedToClose.compareAndSet( false, true ) ) + { + connection.resetAsync(); + } } @Override public boolean isOpen() { - return isOpen.get(); + return isOpen.get() && !markedToClose.get(); } @Override @@ -263,4 +267,16 @@ private void ensureConnectionIsOpen() "Please close this session and retry your statement in another new session." ); } } + + private void ensureSessionIsOpen() + { + if( !isOpen() ) + { + throw new ClientException( + "No more interaction with this session is allowed " + + "as the current session is already closed or marked as closed. " + + "You get this error either because you have a bad reference to a session that has already be closed " + + "or you are trying to reuse a session that you have called `reset` on it." ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java index 5dc460bf22..27d451f17c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java @@ -113,7 +113,7 @@ public void sync() } @Override - public void flush() + public synchronized void flush() { try { @@ -180,7 +180,7 @@ else if ( e instanceof SocketTimeoutException ) } } - private void queueMessage( Message msg, Collector collector ) + private synchronized void queueMessage( Message msg, Collector collector ) { pendingMessages.add( msg ); responseHandler.appendResultCollector( collector ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java index c3eb327114..b2289fdff6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java @@ -18,9 +18,9 @@ */ package org.neo4j.driver.internal.net; -import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.neo4j.driver.internal.messaging.MessageHandler; import org.neo4j.driver.internal.spi.Collector; @@ -39,7 +39,7 @@ public class SocketResponseHandler implements MessageHandler { - private final Queue collectors = new LinkedList<>(); + private final Queue collectors = new ConcurrentLinkedQueue<>(); /** If a failure occurs, the error gets stored here */ private Neo4jException error; diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index c08ecc891f..15e9bc7bad 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -105,7 +105,8 @@ public void shouldKillLongRunningStatement() throws Throwable final int killTimeout = 1; // 1s long startTime = -1, endTime; - try( final Session session = driver.session() ) + final Session session = driver.session(); + try { StatementResult result = session.run( "CALL test.driver.longRunningStatement({seconds})", @@ -123,13 +124,18 @@ public void shouldKillLongRunningStatement() throws Throwable { endTime = System.currentTimeMillis(); assertTrue( startTime > 0 ); - assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill + assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished + assertFalse( session.isOpen() ); } catch ( Exception e ) { fail( "Should be a Neo4jException" ); } + finally + { + session.close(); + } } @Test @@ -168,7 +174,7 @@ public void shouldKillLongStreamingResult() throws Throwable assertThat( recordCount, greaterThan(1) ); assertTrue( startTime > 0 ); - assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill + assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished } } @@ -190,7 +196,7 @@ public void run() } finally { - session.reset(); // kill the session after timeout + session.reset(); // reset the session after timeout } } } ).start(); From 0bb32e78fe7e1dbbc1e0734fc01bf7959106537a Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Wed, 7 Sep 2016 16:54:08 +0200 Subject: [PATCH 2/3] Tests for allowing more statements and tx after reset --- .../neo4j/driver/internal/NetworkSession.java | 11 ++-- .../driver/v1/integration/SessionIT.java | 51 ++++++++++++++++--- 2 files changed, 49 insertions(+), 13 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 1a6cfee89b..0bd005f7c8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -59,7 +59,6 @@ public void run() private ExplicitTransaction currentTransaction; private AtomicBoolean isOpen = new AtomicBoolean( true ); - private AtomicBoolean markedToClose = new AtomicBoolean( false ); NetworkSession( Connection connection, Logger logger ) { @@ -106,19 +105,17 @@ public StatementResult run( Statement statement ) public void reset() { + ensureSessionIsOpen(); ensureNoUnrecoverableError(); ensureConnectionIsOpen(); - if( markedToClose.compareAndSet( false, true ) ) - { - connection.resetAsync(); - } + connection.resetAsync(); } @Override public boolean isOpen() { - return isOpen.get() && !markedToClose.get(); + return isOpen.get(); } @Override @@ -206,6 +203,7 @@ public TypeSystem typeSystem() private void ensureConnectionIsValidBeforeRunningSession() { + ensureSessionIsOpen(); ensureNoUnrecoverableError(); ensureNoOpenTransactionBeforeRunningSession(); ensureConnectionIsOpen(); @@ -213,6 +211,7 @@ private void ensureConnectionIsValidBeforeRunningSession() private void ensureConnectionIsValidBeforeOpeningTransaction() { + ensureSessionIsOpen(); ensureNoUnrecoverableError(); ensureNoOpenTransactionBeforeOpeningTransaction(); ensureConnectionIsOpen(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 15e9bc7bad..8b3abde61c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -21,12 +21,7 @@ import org.junit.Rule; import org.junit.Test; -import org.neo4j.driver.v1.AuthToken; -import org.neo4j.driver.v1.AuthTokens; -import org.neo4j.driver.v1.Driver; -import org.neo4j.driver.v1.GraphDatabase; -import org.neo4j.driver.v1.Session; -import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.*; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.util.TestNeo4j; @@ -126,7 +121,6 @@ public void shouldKillLongRunningStatement() throws Throwable assertTrue( startTime > 0 ); assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished - assertFalse( session.isOpen() ); } catch ( Exception e ) { @@ -201,4 +195,47 @@ public void run() } } ).start(); } + + @Test + public void shouldAllowMoreStatementAfterSessionReset() + { + // Given + try( Driver driver = GraphDatabase.driver( neo4j.uri() ); + Session session = driver.session() ) + { + + session.run( "Return 1" ).consume(); + + // When reset the state of this session + session.reset(); + + // Then can run successfully more statements without any error + session.run( "Return 2" ).consume(); + } + } + + @Test + public void shouldAllowMoreTxAfterSessionReset() + { + // Given + try( Driver driver = GraphDatabase.driver( neo4j.uri() ); + Session session = driver.session() ) + { + try( Transaction tx = session.beginTransaction() ) + { + tx.run("Return 1"); + tx.success(); + } + + // When reset the state of this session + session.reset(); + + // Then can run more Tx + try( Transaction tx = session.beginTransaction() ) + { + tx.run("Return 2"); + tx.success(); + } + } + } } From 58177f4cb386b41486010fd4ab0b189f60e2789c Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Thu, 8 Sep 2016 11:52:02 +0200 Subject: [PATCH 3/3] Invalid current tx when session.reset is called to avoid running more tx in the reseted tx. Sync on tx.markToClose and tx.run to enforce no more statement in the current tx when tx is already failed (reset). --- .../driver/internal/ExplicitTransaction.java | 4 ++-- .../neo4j/driver/internal/NetworkSession.java | 4 ++++ .../driver/v1/integration/SessionIT.java | 23 +++++++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index db61edf2d5..c80da4cb5f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -170,7 +170,7 @@ public StatementResult run( String statementTemplate, Record statementParameters } @Override - public StatementResult run( Statement statement ) + public synchronized StatementResult run( Statement statement ) { ensureNotFailed(); @@ -217,7 +217,7 @@ public TypeSystem typeSystem() return InternalTypeSystem.TYPE_SYSTEM; } - public void markToClose() + public synchronized void markToClose() { state = State.FAILED; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 0bd005f7c8..fa636ccb5f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -109,6 +109,10 @@ public void reset() ensureNoUnrecoverableError(); ensureConnectionIsOpen(); + if( currentTransaction != null ) + { + currentTransaction.markToClose(); + } connection.resetAsync(); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 8b3abde61c..ecba230bcf 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -27,6 +27,7 @@ import org.neo4j.driver.v1.util.TestNeo4j; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -238,4 +239,26 @@ public void shouldAllowMoreTxAfterSessionReset() } } } + + @Test + public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() + { + // Given + try( Driver driver = GraphDatabase.driver( neo4j.uri() ); + Session session = driver.session() ) + { + try( Transaction tx = session.beginTransaction() ) + { + // When reset the state of this session + session.reset(); + // Then + tx.run( "Return 1" ); + fail( "Should not allow tx run as tx is already failed." ); + } + catch( Exception e ) + { + assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) ); + } + } + } }