diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java index 042176e0bf..e0ebb45bb9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java @@ -98,6 +98,14 @@ public StatementResult run( Statement statement ) return cursor; } + public void reset() + { + ensureNoUnrecoverableError(); + ensureConnectionIsOpen(); + + connection.resetAsync(); + } + @Override public boolean isOpen() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java index a054639d1b..89ce975d1a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java @@ -201,6 +201,18 @@ public boolean hasUnrecoverableErrors() return delegate.hasUnrecoverableErrors(); } + @Override + public void resetAsync() + { + delegate.resetAsync(); + } + + @Override + public boolean isInterrupted() + { + return delegate.isInterrupted(); + } + private void markAsAvailable() { inUse.set( false ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java index df0951587b..a04502600c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.messaging.InitMessage; import org.neo4j.driver.internal.messaging.Message; @@ -30,8 +31,8 @@ import org.neo4j.driver.internal.messaging.RunMessage; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.v1.Logger; import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; @@ -45,6 +46,7 @@ public class SocketConnection implements Connection { private final Queue pendingMessages = new LinkedList<>(); private final SocketResponseHandler responseHandler; + private AtomicBoolean interrupted = new AtomicBoolean( false ); private final SocketClient socket; @@ -206,4 +208,27 @@ public boolean hasUnrecoverableErrors() { throw new UnsupportedOperationException( "Unrecoverable error detection is not supported on SocketConnection." ); } + + @Override + public void resetAsync() + { + if( interrupted.compareAndSet( false, true ) ) + { + queueMessage( RESET, new StreamCollector.ResetStreamCollector( new Runnable() + { + @Override + public void run() + { + interrupted.set( false ); + } + } ) ); + flush(); + } + } + + @Override + public boolean isInterrupted() + { + return interrupted.get(); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java index 3e7e4d04ef..ea62973f20 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java @@ -244,5 +244,4 @@ public void clearError() { error = null; } - } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java index 54784d9551..f0b63859fc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java @@ -211,6 +211,25 @@ public boolean hasUnrecoverableErrors() return unrecoverableErrorsOccurred; } + @Override + public void resetAsync() + { + try + { + delegate.resetAsync(); + } + catch( RuntimeException e ) + { + onDelegateException( e ); + } + } + + @Override + public boolean isInterrupted() + { + return delegate.isInterrupted(); + } + public void dispose() { delegate.close(); @@ -228,7 +247,7 @@ private void onDelegateException( RuntimeException e ) { unrecoverableErrorsOccurred = true; } - else + else if( !isInterrupted() ) { ackFailure(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index a88a27910b..ecdbac18e0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -106,4 +106,15 @@ public interface Connection extends AutoCloseable boolean hasUnrecoverableErrors(); + + /** + * Asynchronously sending reset to the socket output channel. + */ + void resetAsync(); + + /** + * Return true if the current session statement execution has been interrupted by another thread, otherwise false. + * @return true if the current session statement execution has been interrupted by another thread, otherwise false + */ + boolean isInterrupted(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/StreamCollector.java b/driver/src/main/java/org/neo4j/driver/internal/spi/StreamCollector.java index aeb1fc22e9..09a459b492 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/StreamCollector.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/StreamCollector.java @@ -60,9 +60,22 @@ public void doneIgnored() } }; + StreamCollector RESET = new ResetStreamCollector(); - StreamCollector RESET = new NoOperationStreamCollector() + class ResetStreamCollector extends NoOperationStreamCollector { + private final Runnable doneSuccessCallBack; + + public ResetStreamCollector() + { + this( null ); + } + + public ResetStreamCollector( Runnable doneSuccessCallBack ) + { + this.doneSuccessCallBack = doneSuccessCallBack; + } + @Override public void doneFailure( Neo4jException error ) { @@ -76,7 +89,17 @@ public void doneIgnored() throw new ClientException( "Invalid server response message `IGNORED` received for client message `RESET`." ); } - }; + + @Override + public void doneSuccess() + { + if( doneSuccessCallBack != null ) + { + doneSuccessCallBack.run(); + } + } + } + class NoOperationStreamCollector implements StreamCollector { diff --git a/driver/src/main/java/org/neo4j/driver/v1/Session.java b/driver/src/main/java/org/neo4j/driver/v1/Session.java index e5be539dd9..80a44dd44b 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Session.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Session.java @@ -58,6 +58,13 @@ public interface Session extends Resource, StatementRunner */ Transaction beginTransaction(); + /** + * Reset the current session. This sends an immediate RESET signal to the server which both interrupts + * any statement that is currently executing and ignores any subsequently queued statements. Following + * the reset, the current transaction will have been rolled back and any outstanding failures will + * have been acknowledged. + */ + void reset(); /** * Signal that you are done using this session. In the default driver usage, closing diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 4ed59dcbf7..48aed49013 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -26,9 +26,17 @@ import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.util.TestNeo4j; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.neo4j.driver.v1.Values.parameters; public class SessionIT { @@ -78,4 +86,104 @@ public void shouldHandleNullAuthToken() throws Throwable // Then assertFalse( session.isOpen() ); } + + @Test + public void shouldKillLongRunningStatement() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + // Given + Driver driver = GraphDatabase.driver( neo4j.uri() ); + + int executionTimeout = 10; // 10s + final int killTimeout = 1; // 1s + long startTime = -1, endTime; + + try( final Session session = driver.session() ) + { + StatementResult result = + session.run( "CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", executionTimeout ) ); + + resetSessionAfterTimeout( session, killTimeout ); + + // When + startTime = System.currentTimeMillis(); + result.consume();// blocking to run the statement + + fail("Should have got an exception about statement get killed."); + } + catch( ClientException e ) + { + endTime = System.currentTimeMillis(); + assertThat( e.code(), equalTo("Neo.ClientError.Procedure.ProcedureCallFailed") ); + + assertTrue( startTime > 0 ); + assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill + assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished + } + } + + @Test + public void shouldKillLongStreamingResult() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + // Given + Driver driver = GraphDatabase.driver( neo4j.uri() ); + + int executionTimeout = 10; // 10s + final int killTimeout = 1; // 1s + long startTime = -1, endTime; + int recordCount = 0; + + try( final Session session = driver.session() ) + { + StatementResult result = session.run( "CALL test.driver.longStreamingResult({seconds})", + parameters( "seconds", executionTimeout ) ); + + resetSessionAfterTimeout( session, killTimeout ); + + // When + startTime = System.currentTimeMillis(); + while( result.hasNext() ) + { + result.next(); + recordCount++; + } + + fail("Should have got an exception about statement get killed."); + } + catch( ClientException e ) + { + endTime = System.currentTimeMillis(); + assertThat( e.code(), equalTo("Neo.ClientError.Procedure.ProcedureCallFailed") ); + assertThat( recordCount, greaterThan(1) ); + + assertTrue( startTime > 0 ); + assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill + assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished + } + } + + private void resetSessionAfterTimeout( final Session session, final int timeout ) + { + new Thread( new Runnable() + { + @Override + public void run() + { + try + { + Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds + } + catch ( InterruptedException e ) + { + e.printStackTrace(); + } + finally + { + session.reset(); // kill the session after timeout + } + } + } ).start(); + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/FileTools.java b/driver/src/test/java/org/neo4j/driver/v1/util/FileTools.java index a6e22638e1..8a516133e0 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/FileTools.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/FileTools.java @@ -164,7 +164,7 @@ public static void copyFile( File srcFile, File dstFile ) throws IOException catch ( IOException e ) { // Because the message from this cause may not mention which file it's about - throw new IOException( "Could not copy '" + srcFile + "' to '" + dstFile + "'", e ); + throw new IOException( "Could not copy '" + srcFile.getCanonicalPath() + "' to '" + dstFile.getCanonicalPath() + "'", e ); } finally { diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java index 1f32e2ce18..9b16ac72f1 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java @@ -123,4 +123,14 @@ public void updateEncryptionKeyAndCert( File key, File cert ) throws Exception FileTools.copyFile( cert, Neo4jSettings.DEFAULT_TLS_CERT_FILE ); runner.forceToRestart(); // needs to force to restart as no configuration changed } + + public void ensureProcedures( String jarName ) throws IOException + { + File procedureJar = new File( Neo4jRunner.NEO4J_HOME, "plugins/" + jarName ); + if( !procedureJar.exists() ) + { + FileTools.copyFile( new File( "src/test/resources", jarName ), procedureJar ); + runner.forceToRestart(); // needs to force to restart as no configuration changed + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java index 6afe4372b2..05a63501bd 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java @@ -100,6 +100,12 @@ public Transaction beginTransaction() return realSession.beginTransaction(); } + @Override + public void reset() + { + realSession.reset(); + } + @Override public StatementResult run( String statementText, Map statementParameters ) { diff --git a/driver/src/test/resources/longRunningStatement.jar b/driver/src/test/resources/longRunningStatement.jar new file mode 100644 index 0000000000..a5416792a7 Binary files /dev/null and b/driver/src/test/resources/longRunningStatement.jar differ