From 8801db9413df501703b516953c986fbeed542e1a Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 17 Oct 2017 15:21:06 +0200 Subject: [PATCH 1/7] Session#closeAsync() waits result to be fetched This commit aligns behaviour of `Session#close()` and `Session#closeAsync()` so that later one also waits for the latest result to be fully fetched. It makes possible to consume result after session is closed and allows propagation of query errors in `#closeAsync()`. Errors are propagated only if they were not consumed by reading the result cursor. --- .../neo4j/driver/internal/NetworkSession.java | 66 ++++++++++++------- .../async/InternalStatementResultCursor.java | 6 +- .../driver/internal/async/QueryRunner.java | 6 +- .../handlers/PullAllResponseHandler.java | 34 +++++----- .../driver/internal/NetworkSessionTest.java | 11 +++- .../driver/v1/integration/SessionAsyncIT.java | 45 +++++++++++++ .../driver/v1/integration/SessionIT.java | 49 ++++++++++++++ 7 files changed, 167 insertions(+), 50 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 0bbe5fdc71..9852631dc9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -153,43 +153,42 @@ public boolean isOpen() @Override public void close() { - if ( open.compareAndSet( true, false ) ) - { - // todo: should closeAsync() also do this waiting for buffered result? - // todo: unit test result buffering? - getBlocking( lastResultStage - .exceptionally( error -> null ) - .thenCompose( this::ensureBuffered ) - .thenCompose( error -> releaseResources().thenApply( ignore -> - { - if ( error != null ) - { - throw new CompletionException( error ); - } - return null; - } ) ) ); - } + getBlocking( closeAsync() ); } @Override public CompletionStage closeAsync() { - // todo: wait for buffered result? if ( open.compareAndSet( true, false ) ) { - return releaseResources(); + return lastResultStage.thenCompose( this::receiveError ) + .exceptionally( error -> error ) // connection acquisition or RUN failed, propagate error + .thenCompose( error -> releaseResources().thenApply( connectionReleased -> + { + Throwable queryError = Futures.completionErrorCause( error ); + if ( queryError != null && connectionReleased ) + { + // connection has been acquired and there is an unconsumed error in result cursor + throw new CompletionException( queryError ); + } + else + { + // either connection acquisition failed or + // there are no unconsumed errors in the result cursor + return null; + } + } ) ); } return completedFuture( null ); } - // todo: test this method - CompletionStage ensureBuffered( InternalStatementResultCursor cursor ) + private CompletionStage receiveError( InternalStatementResultCursor cursor ) { if ( cursor == null ) { return completedFuture( null ); } - return cursor.resultBuffered(); + return cursor.failureAsync(); } @Override @@ -479,11 +478,21 @@ private CompletionStage acquireConnection( AccessMode mode ) return connectionStage; } - private CompletionStage releaseResources() + /** + * Rollback existing transaction and release existing connection. + * + * @return {@link CompletionStage} as returned by {@link #releaseConnectionNow()}. + */ + private CompletionStage releaseResources() { return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() ); } + /** + * Rollback existing transaction, if any. Errors will be ignored. + * + * @return {@link CompletionStage} completed with {@code null} when transaction rollback completes or fails. + */ private CompletionStage rollbackTransaction() { return existingTransactionOrNull().thenCompose( tx -> @@ -501,15 +510,22 @@ private CompletionStage rollbackTransaction() } ); } - private CompletionStage releaseConnectionNow() + /** + * Release existing connection or do nothing when none has been acquired. + * + * @return {@link CompletionStage} completed with {@code true} when there was a connection and it has been released, + * {@link CompletionStage} completed with {@code false} when connection has not been acquired and nothing has been + * released. + */ + private CompletionStage releaseConnectionNow() { return existingConnectionOrNull().thenCompose( connection -> { if ( connection != null ) { - return connection.releaseNow(); + return connection.releaseNow().thenApply( ignore -> true ); } - return completedFuture( null ); + return completedFuture( false ); } ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java index eecbdb2172..1f2e0ae6f2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java @@ -34,6 +34,7 @@ import org.neo4j.driver.v1.util.Function; import org.neo4j.driver.v1.util.Functions; +// todo: unit tests public class InternalStatementResultCursor implements StatementResultCursor { // todo: maybe smth better than these two string constants? @@ -142,10 +143,9 @@ public CompletionStage> listAsync( Function mapFunction ) return resultFuture; } - // todo: test this method and give it better name - public CompletionStage resultBuffered() + public CompletionStage failureAsync() { - return pullAllHandler.resultBuffered(); + return pullAllHandler.failureAsync(); } private void internalForEachAsync( Consumer action, CompletableFuture resultFuture ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java index ebcf7031af..9097e735ea 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java @@ -48,8 +48,7 @@ public static CompletionStage runAsBlocking( Conn } public static CompletionStage runAsBlocking( Connection connection, - Statement statement, - ExplicitTransaction tx ) + Statement statement, ExplicitTransaction tx ) { return runAsAsync( connection, statement, tx, false ); } @@ -61,8 +60,7 @@ public static CompletionStage runAsAsync( Connect } public static CompletionStage runAsAsync( Connection connection, - Statement statement, - ExplicitTransaction tx ) + Statement statement, ExplicitTransaction tx ) { return runAsAsync( connection, statement, tx, true ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index cee8595b50..7c617ab0a0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -49,6 +49,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.neo4j.driver.internal.util.Futures.failedFuture; +// todo: unit tests public abstract class PullAllResponseHandler implements ResponseHandler { private static final boolean TOUCH_AUTO_READ = false; @@ -59,17 +60,15 @@ public abstract class PullAllResponseHandler implements ResponseHandler private final Queue records = new LinkedList<>(); - // todo: use presence of summary as a "finished" indicator and remove this field private boolean finished; private Throwable failure; private ResultSummary summary; private CompletableFuture recordFuture; private CompletableFuture summaryFuture; - private CompletableFuture resultBufferedFuture; + private CompletableFuture failureFuture; - public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, - Connection connection ) + public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection ) { this.statement = requireNonNull( statement ); this.runResponseHandler = requireNonNull( runResponseHandler ); @@ -86,7 +85,7 @@ public synchronized void onSuccess( Map metadata ) completeRecordFuture( null ); completeSummaryFuture( summary ); - completeResultBufferedFuture( null ); + completeFailureFuture( null ); } protected abstract void afterSuccess(); @@ -104,7 +103,7 @@ public synchronized void onFailure( Throwable error ) { // error propagated through record future, complete other two completeSummaryFuture( summary ); - completeResultBufferedFuture( null ); + completeFailureFuture( null ); } else { @@ -112,13 +111,14 @@ public synchronized void onFailure( Throwable error ) if ( failedSummaryFuture ) { // error propagated through summary future, complete other one - completeResultBufferedFuture( null ); + completeFailureFuture( null ); } else { - boolean completedResultBufferedFuture = completeResultBufferedFuture( error ); - if ( !completedResultBufferedFuture ) + boolean completedFailureFuture = completeFailureFuture( error ); + if ( !completedFailureFuture ) { + // error has not been propagated to the user, remember it failure = error; } } @@ -189,7 +189,7 @@ public synchronized CompletionStage summaryAsync() } } - public synchronized CompletionStage resultBuffered() + public synchronized CompletionStage failureAsync() { if ( failure != null ) { @@ -203,11 +203,11 @@ else if ( finished ) } else { - if ( resultBufferedFuture == null ) + if ( failureFuture == null ) { - resultBufferedFuture = new CompletableFuture<>(); + failureFuture = new CompletableFuture<>(); } - return resultBufferedFuture; + return failureFuture; } } @@ -280,12 +280,12 @@ private boolean failSummaryFuture( Throwable error ) return false; } - private boolean completeResultBufferedFuture( Throwable error ) + private boolean completeFailureFuture( Throwable error ) { - if ( resultBufferedFuture != null ) + if ( failureFuture != null ) { - CompletableFuture future = resultBufferedFuture; - resultBufferedFuture = null; + CompletableFuture future = failureFuture; + failureFuture = null; future.complete( error ); return true; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 06053347f9..f20604ec78 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -34,6 +34,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Session; @@ -88,6 +89,7 @@ public void setUp() { connection = connectionMock(); when( connection.releaseNow() ).thenReturn( completedFuture( null ) ); + when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); connectionProvider = mock( ConnectionProvider.class ); when( connectionProvider.acquireConnection( any( AccessMode.class ) ) ) .thenReturn( completedFuture( connection ) ); @@ -247,8 +249,15 @@ public void acquiresNewConnectionWhenUnableToUseCurrentOneForRun() } @Test - public void forceReleasesOpenConnectionUsedForRunWhenSessionIsClosed() + public void releasesOpenConnectionUsedForRunWhenSessionIsClosed() { + doAnswer( invocation -> + { + ResponseHandler pullAllHandler = invocation.getArgumentAt( 3, ResponseHandler.class ); + pullAllHandler.onSuccess( emptyMap() ); + return null; + } ).when( connection ).runAndFlush( eq( "RETURN 1" ), eq( emptyMap() ), any(), any() ); + session.run( "RETURN 1" ); getBlocking( session.closeAsync() ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index d7f44729ba..0e91a2ad70 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -723,6 +723,7 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() } @Test +<<<<<< execute( Transaction tx ) assertEquals( 1, countNodesByLabel( "MyNode" ) ); } + @Test + public void shouldPropagateRunFailureWhenClosed() + { + session.runAsync( "RETURN 10 / 0" ); + + try + { + getBlocking( session.closeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldPropagatePullAllFailureWhenClosed() + { + session.runAsync( "UNWIND range(20000, 0, -1) AS x RETURN 10 / x" ); + + try + { + getBlocking( session.closeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldBePossibleToConsumeResultAfterSessionIsClosed() + { + CompletionStage cursorStage = session.runAsync( "UNWIND range(1, 20000) AS x RETURN x" ); + + getBlocking( session.closeAsync() ); + + StatementResultCursor cursor = getBlocking( cursorStage ); + List ints = getBlocking( cursor.listAsync( record -> record.get( 0 ).asInt() ) ); + assertEquals( 20000, ints.size() ); + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { CompletableFuture>> resultFuture = new CompletableFuture<>(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 20a8d71d42..0d50874911 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -1233,6 +1233,55 @@ public String execute( Transaction tx ) } } + @Test + public void shouldPropagateRunFailureWhenClosed() + { + Session session = neo4j.driver().session(); + + session.run( "RETURN 10 / 0" ); + + try + { + session.close(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldPropagatePullAllFailureWhenClosed() + { + Session session = neo4j.driver().session(); + + session.run( "UNWIND range(20000, 0, -1) AS x RETURN 10 / x" ); + + try + { + session.close(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldBePossibleToConsumeResultAfterSessionIsClosed() + { + StatementResult result; + try ( Session session = neo4j.driver().session() ) + { + result = session.run( "UNWIND range(1, 20000) AS x RETURN x" ); + } + + List ints = result.list( record -> record.get( 0 ).asInt() ); + assertEquals( 20000, ints.size() ); + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); From 2a702c10ba26c4d64f9978a66af22a9e303e8d24 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 17 Oct 2017 15:27:17 +0200 Subject: [PATCH 2/7] Prettier impl of `ExplicitTransaction#isOpen()` Made transaction state enum responsible for knowing if transaction is open or closed. This seems prettier than large boolean expression. --- .../driver/internal/ExplicitTransaction.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 6b2f9c7fda..84ffc17f92 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -56,24 +56,31 @@ public class ExplicitTransaction implements Transaction private enum State { /** The transaction is running with no explicit success or failure marked */ - ACTIVE, + ACTIVE( true ), /** Running, user marked for success, meaning it'll value committed */ - MARKED_SUCCESS, + MARKED_SUCCESS( true ), /** User marked as failed, meaning it'll be rolled back. */ - MARKED_FAILED, + MARKED_FAILED( true ), /** * This transaction has been explicitly terminated by calling {@link Session#reset()}. */ - TERMINATED, + TERMINATED( false ), /** This transaction has successfully committed */ - COMMITTED, + COMMITTED( false ), /** This transaction has been rolled back */ - ROLLED_BACK + ROLLED_BACK( false ); + + final boolean txOpen; + + State( boolean txOpen ) + { + this.txOpen = txOpen; + } } private final Connection connection; @@ -289,7 +296,7 @@ public CompletionStage runAsync( Statement statement ) @Override public boolean isOpen() { - return state != State.COMMITTED && state != State.ROLLED_BACK && state != State.TERMINATED; + return state.txOpen; } private void ensureCanRunQueries() From 5431d50978608511ce0198ed5f6958f66d217bf7 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 18 Oct 2017 13:50:32 +0200 Subject: [PATCH 3/7] Consume failures when tx commit/rollback This commit makes transactions surface unconsumed query errors when doing commit or rollback. It's achieved by keeping track of all result cursors and polling them for unconsumed failures when doing commit or rollback. --- .../driver/internal/ExplicitTransaction.java | 125 +++++--- .../neo4j/driver/internal/NetworkSession.java | 38 +-- .../handlers/PullAllResponseHandler.java | 1 + .../internal/handlers/RunResponseHandler.java | 14 +- .../neo4j/driver/internal/util/Futures.java | 14 + .../driver/internal/NetworkSessionTest.java | 27 +- .../driver/v1/integration/SessionAsyncIT.java | 86 +++++- .../v1/integration/TransactionAsyncIT.java | 275 +++++++++++++++++- .../driver/v1/stress/AsyncWrongQuery.java | 28 +- .../driver/v1/stress/AsyncWrongQueryInTx.java | 29 +- 10 files changed, 530 insertions(+), 107 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 84ffc17f92..f946d4adc7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -18,18 +18,25 @@ */ package org.neo4j.driver.internal; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import org.neo4j.driver.internal.async.InternalStatementResultCursor; import org.neo4j.driver.internal.async.QueryRunner; import org.neo4j.driver.internal.handlers.BeginTxResponseHandler; import org.neo4j.driver.internal.handlers.CommitTxResponseHandler; import org.neo4j.driver.internal.handlers.NoOpResponseHandler; import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.types.InternalTypeSystem; +import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; @@ -43,6 +50,7 @@ import static java.util.Collections.emptyMap; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.neo4j.driver.internal.util.Futures.completionErrorCause; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.internal.util.Futures.getBlocking; import static org.neo4j.driver.v1.Values.value; @@ -86,6 +94,7 @@ private enum State private final Connection connection; private final NetworkSession session; + private final List> resultCursors = new ArrayList<>(); private volatile Bookmark bookmark = Bookmark.empty(); private volatile State state = State.ACTIVE; @@ -169,7 +178,9 @@ else if ( state == State.TERMINATED ) } else { - return doCommitAsync().whenComplete( transactionClosed( State.COMMITTED ) ); + return receiveFailures() + .thenCompose( failure -> doCommitAsync().handle( handleCommitOrRollback( failure ) ) ) + .whenComplete( transactionClosed( State.COMMITTED ) ); } } @@ -192,38 +203,12 @@ else if ( state == State.TERMINATED ) } else { - return doRollbackAsync().whenComplete( transactionClosed( State.ROLLED_BACK ) ); + return receiveFailures() + .thenCompose( failure -> doRollbackAsync().handle( handleCommitOrRollback( failure ) ) ) + .whenComplete( transactionClosed( State.ROLLED_BACK ) ); } } - private BiConsumer transactionClosed( State newState ) - { - return ( ignore, error ) -> - { - state = newState; - connection.releaseInBackground(); - session.setBookmark( bookmark ); - }; - } - - private CompletionStage doCommitAsync() - { - CompletableFuture commitFuture = new CompletableFuture<>(); - connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, - new CommitTxResponseHandler( commitFuture, this ) ); - - return commitFuture.thenRun( () -> state = State.COMMITTED ); - } - - private CompletionStage doRollbackAsync() - { - CompletableFuture rollbackFuture = new CompletableFuture<>(); - connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, - new RollbackTxResponseHandler( rollbackFuture ) ); - - return rollbackFuture.thenRun( () -> state = State.ROLLED_BACK ); - } - @Override public StatementResult run( String statementText, Value statementParameters ) { @@ -280,23 +265,31 @@ public CompletionStage runAsync( String statementTemplate @Override public StatementResult run( Statement statement ) { - ensureCanRunQueries(); - StatementResultCursor cursor = getBlocking( QueryRunner.runAsBlocking( connection, statement, this ) ); + StatementResultCursor cursor = getBlocking( run( statement, false ) ); return new InternalStatementResult( cursor ); } @Override public CompletionStage runAsync( Statement statement ) { - ensureCanRunQueries(); //noinspection unchecked - return (CompletionStage) QueryRunner.runAsAsync( connection, statement, this ); + return (CompletionStage) run( statement, true ); } - @Override - public boolean isOpen() + private CompletionStage run( Statement statement, boolean asAsync ) { - return state.txOpen; + ensureCanRunQueries(); + CompletionStage result; + if ( asAsync ) + { + result = QueryRunner.runAsAsync( connection, statement, this ); + } + else + { + result = QueryRunner.runAsBlocking( connection, statement, this ); + } + resultCursors.add( result ); + return result; } private void ensureCanRunQueries() @@ -324,6 +317,12 @@ else if ( state == State.TERMINATED ) } } + @Override + public boolean isOpen() + { + return state.txOpen; + } + @Override public TypeSystem typeSystem() { @@ -347,4 +346,56 @@ public void setBookmark( Bookmark bookmark ) this.bookmark = bookmark; } } + + private CompletionStage doCommitAsync() + { + CompletableFuture commitFuture = new CompletableFuture<>(); + ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this ); + connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler ); + return commitFuture; + } + + private CompletionStage doRollbackAsync() + { + CompletableFuture rollbackFuture = new CompletableFuture<>(); + ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture ); + connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler ); + return rollbackFuture; + } + + private BiFunction handleCommitOrRollback( Throwable cursorFailure ) + { + return ( ignore, commitOrRollbackError ) -> + { + if ( cursorFailure != null ) + { + throw new CompletionException( completionErrorCause( cursorFailure ) ); + } + else if ( commitOrRollbackError != null ) + { + throw new CompletionException( completionErrorCause( commitOrRollbackError ) ); + } + else + { + return null; + } + }; + } + + private BiConsumer transactionClosed( State newState ) + { + return ( ignore, error ) -> + { + state = newState; + connection.releaseInBackground(); + session.setBookmark( bookmark ); + }; + } + + private CompletionStage receiveFailures() + { + return resultCursors.stream() + .map( stage -> stage.thenCompose( InternalStatementResultCursor::failureAsync ) ) + .reduce( completedFuture( null ), Futures::firstNotNull ); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 9852631dc9..d8f0efcb28 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -161,12 +161,13 @@ public CompletionStage closeAsync() { if ( open.compareAndSet( true, false ) ) { - return lastResultStage.thenCompose( this::receiveError ) - .exceptionally( error -> error ) // connection acquisition or RUN failed, propagate error - .thenCompose( error -> releaseResources().thenApply( connectionReleased -> + return lastResultStage + .exceptionally( error -> null ) // ignore connection acquisition failures + .thenCompose( this::receiveFailure ) + .thenCompose( error -> releaseResources().thenApply( ignore -> { Throwable queryError = Futures.completionErrorCause( error ); - if ( queryError != null && connectionReleased ) + if ( queryError != null ) { // connection has been acquired and there is an unconsumed error in result cursor throw new CompletionException( queryError ); @@ -182,7 +183,7 @@ public CompletionStage closeAsync() return completedFuture( null ); } - private CompletionStage receiveError( InternalStatementResultCursor cursor ) + private CompletionStage receiveFailure( InternalStatementResultCursor cursor ) { if ( cursor == null ) { @@ -478,21 +479,11 @@ private CompletionStage acquireConnection( AccessMode mode ) return connectionStage; } - /** - * Rollback existing transaction and release existing connection. - * - * @return {@link CompletionStage} as returned by {@link #releaseConnectionNow()}. - */ - private CompletionStage releaseResources() + private CompletionStage releaseResources() { return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() ); } - /** - * Rollback existing transaction, if any. Errors will be ignored. - * - * @return {@link CompletionStage} completed with {@code null} when transaction rollback completes or fails. - */ private CompletionStage rollbackTransaction() { return existingTransactionOrNull().thenCompose( tx -> @@ -505,27 +496,20 @@ private CompletionStage rollbackTransaction() } ).exceptionally( error -> { Throwable cause = Futures.completionErrorCause( error ); - logger.error( "Failed to rollback active transaction", cause ); + logger.warn( "Active transaction rolled back with an error", cause ); return null; } ); } - /** - * Release existing connection or do nothing when none has been acquired. - * - * @return {@link CompletionStage} completed with {@code true} when there was a connection and it has been released, - * {@link CompletionStage} completed with {@code false} when connection has not been acquired and nothing has been - * released. - */ - private CompletionStage releaseConnectionNow() + private CompletionStage releaseConnectionNow() { return existingConnectionOrNull().thenCompose( connection -> { if ( connection != null ) { - return connection.releaseNow().thenApply( ignore -> true ); + return connection.releaseNow(); } - return completedFuture( false ); + return completedFuture( null ); } ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index 7c617ab0a0..6a15353b73 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -173,6 +173,7 @@ public synchronized CompletionStage nextAsync() } ); } + // todo: propagate failure from here as well public synchronized CompletionStage summaryAsync() { if ( summary != null ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java index 28ec1d8f7d..3b2c1dc130 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java @@ -46,13 +46,13 @@ public void onSuccess( Map metadata ) statementKeys = extractKeys( metadata ); resultAvailableAfter = extractResultAvailableAfter( metadata ); - runCompletedFuture.complete( null ); + completeRunFuture(); } @Override public void onFailure( Throwable error ) { - runCompletedFuture.completeExceptionally( error ); + completeRunFuture(); } @Override @@ -71,6 +71,16 @@ public long resultAvailableAfter() return resultAvailableAfter; } + /** + * Complete the given future with {@code null}. Future is never completed exceptionally because callers are only + * interested in when RUN completes and not how. Async API needs to wait for RUN because it needs to access + * statement keys. + */ + private void completeRunFuture() + { + runCompletedFuture.complete( null ); + } + private static List extractKeys( Map metadata ) { Value keysValue = metadata.get( "fields" ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java index d7fc6cd012..25e7c819d2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java @@ -26,6 +26,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import static java.util.concurrent.CompletableFuture.completedFuture; + public final class Futures { private Futures() @@ -116,4 +118,16 @@ public static Throwable completionErrorCause( Throwable error ) } return error; } + + public static CompletionStage firstNotNull( CompletionStage stage1, CompletionStage stage2 ) + { + return stage1.thenCompose( value -> + { + if ( value != null ) + { + return completedFuture( value ); + } + return stage2; + } ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index f20604ec78..4ed7b2f19e 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -251,14 +251,10 @@ public void acquiresNewConnectionWhenUnableToUseCurrentOneForRun() @Test public void releasesOpenConnectionUsedForRunWhenSessionIsClosed() { - doAnswer( invocation -> - { - ResponseHandler pullAllHandler = invocation.getArgumentAt( 3, ResponseHandler.class ); - pullAllHandler.onSuccess( emptyMap() ); - return null; - } ).when( connection ).runAndFlush( eq( "RETURN 1" ), eq( emptyMap() ), any(), any() ); + String query = "RETURN 1"; + setupSuccessfulPullAll( query ); - session.run( "RETURN 1" ); + session.run( query ); getBlocking( session.closeAsync() ); @@ -360,11 +356,14 @@ public void updatesBookmarkWhenTxIsClosed() @Test public void releasesConnectionWhenTxIsClosed() { + String query = "RETURN 42"; + setupSuccessfulPullAll( query ); + Transaction tx = session.beginTransaction(); - tx.run( "RETURN 1" ); + tx.run( query ); verify( connectionProvider ).acquireConnection( READ ); - verify( connection ).runAndFlush( eq( "RETURN 1" ), any(), any(), any() ); + verify( connection ).runAndFlush( eq( query ), any(), any(), any() ); tx.close(); verify( connection ).releaseInBackground(); @@ -1031,6 +1030,16 @@ private static void setupFailingBegin( Connection connection, Throwable error ) } ).when( connection ).runAndFlush( eq( "BEGIN" ), any(), any(), any() ); } + private void setupSuccessfulPullAll( String query ) + { + doAnswer( invocation -> + { + ResponseHandler pullAllHandler = invocation.getArgumentAt( 3, ResponseHandler.class ); + pullAllHandler.onSuccess( emptyMap() ); + return null; + } ).when( connection ).runAndFlush( eq( query ), eq( emptyMap() ), any(), any() ); + } + private static class TxWork implements TransactionWork { final int result; diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index 0e91a2ad70..29ba15f1f8 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -149,9 +149,10 @@ public void shouldRunQueryWithMultipleResults() @Test public void shouldFailForIncorrectQuery() { + StatementResultCursor cursor = await( session.runAsync( "RETURN" ) ); try { - await( session.runAsync( "RETURN" ) ); + await( cursor.nextAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -658,9 +659,10 @@ public void shouldRunAfterRunFailureToAcquireConnection() { neo4j.killDb(); + StatementResultCursor cursor1 = getBlocking( session.runAsync( "RETURN 42" ) ); try { - getBlocking( session.runAsync( "RETURN 42" ) ); + getBlocking( cursor1.nextAsync() ); fail( "Exception expected" ); } catch ( ServiceUnavailableException e ) @@ -670,8 +672,8 @@ public void shouldRunAfterRunFailureToAcquireConnection() neo4j.startDb(); - StatementResultCursor cursor = getBlocking( session.runAsync( "RETURN 42" ) ); - Record record = getBlocking( cursor.singleAsync() ); + StatementResultCursor cursor2 = getBlocking( session.runAsync( "RETURN 42" ) ); + Record record = getBlocking( cursor2.singleAsync() ); assertEquals( 42, record.get( 0 ).asInt() ); } @@ -703,9 +705,10 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() { neo4j.killDb(); + StatementResultCursor cursor1 = await( session.runAsync( "RETURN 42" ) ); try { - getBlocking( session.runAsync( "RETURN 42" ) ); + getBlocking( cursor1.consumeAsync() ); fail( "Exception expected" ); } catch ( ServiceUnavailableException e ) @@ -716,8 +719,8 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() neo4j.startDb(); Transaction tx = getBlocking( session.beginTransactionAsync() ); - StatementResultCursor cursor = getBlocking( tx.runAsync( "RETURN 42" ) ); - Record record = getBlocking( cursor.singleAsync() ); + StatementResultCursor cursor2 = getBlocking( tx.runAsync( "RETURN 42" ) ); + Record record = getBlocking( cursor2.singleAsync() ); assertEquals( 42, record.get( 0 ).asInt() ); assertNull( getBlocking( tx.rollbackAsync() ) ); } @@ -849,6 +852,23 @@ public void shouldPropagateRunFailureWhenClosed() } } + @Test + public void shouldPropagateBlockedRunFailureWhenClosed() + { + getBlocking( session.runAsync( "RETURN 10 / 0" ) ); + + try + { + getBlocking( session.closeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test public void shouldPropagatePullAllFailureWhenClosed() { @@ -865,6 +885,58 @@ public void shouldPropagatePullAllFailureWhenClosed() } } + @Test + public void shouldPropagateBlockedPullAllFailureWhenClosed() + { + getBlocking( session.runAsync( "UNWIND range(20000, 0, -1) AS x RETURN 10 / x" ) ); + + try + { + getBlocking( session.closeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldCloseCleanlyWhenRunErrorConsumed() + { + StatementResultCursor cursor = getBlocking( session.runAsync( "SomeWrongQuery" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), startsWith( "Invalid input" ) ); + } + + assertNull( getBlocking( session.closeAsync() ) ); + } + + @Test + public void shouldCloseCleanlyWhenPullAllErrorConsumed() + { + StatementResultCursor cursor = getBlocking( session.runAsync( "UNWIND range(10, 0, -1) AS x RETURN 1 / x" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + + assertNull( getBlocking( session.closeAsync() ) ); + } + @Test public void shouldBePossibleToConsumeResultAfterSessionIsClosed() { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index ab544dde1f..d6e55a60ba 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -227,9 +227,10 @@ public void shouldFailToCommitAfterSingleWrongStatement() { Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( "RETURN" ) ); try { - await( tx.runAsync( "RETURN" ) ); + await( cursor.consumeAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -253,9 +254,10 @@ public void shouldAllowRollbackAfterSingleWrongStatement() { Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( "RETURN" ) ); try { - await( tx.runAsync( "RETURN" ) ); + await( cursor.nextAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -281,9 +283,10 @@ public void shouldFailToCommitAfterCoupleCorrectAndSingleWrongStatement() assertNotNull( record2 ); assertEquals( 42, record2.get( 0 ).asInt() ); + StatementResultCursor cursor3 = await( tx.runAsync( "RETURN" ) ); try { - await( tx.runAsync( "RETURN" ) ); + await( cursor3.consumeAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -317,9 +320,11 @@ public void shouldAllowRollbackAfterCoupleCorrectAndSingleWrongStatement() assertNotNull( record2 ); assertEquals( 42, record2.get( 0 ).asInt() ); + StatementResultCursor cursor3 = await( tx.runAsync( "RETURN" ) ); try { - await( tx.runAsync( "RETURN" ) ); + // todo: use summaryAsync() + await( cursor3.consumeAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -335,9 +340,10 @@ public void shouldNotAllowNewStatementsAfterAnIncorrectStatement() { Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( "RETURN" ) ); try { - await( tx.runAsync( "RETURN" ) ); + await( cursor.nextAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -954,6 +960,265 @@ public void shouldUpdateSessionBookmarkAfterCommit() assertNotEquals( bookmarkBefore, bookmarkAfter ); } + @Test + public void shouldFailToCommitWhenQueriesFailAndErrorNotConsumed() throws InterruptedException + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + tx.runAsync( "CREATE (:TestNode)" ); + tx.runAsync( "CREATE (:TestNode)" ); + tx.runAsync( "RETURN 10 / 0" ); + tx.runAsync( "CREATE (:TestNode)" ); + + Thread.sleep( 1000 ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertEquals( "/ by zero", e.getMessage() ); + } + } + + @Test + public void shouldPropagateRunFailureFromCommit() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + tx.runAsync( "RETURN ILLEGAL" ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "ILLEGAL" ) ); + } + } + + @Test + public void shouldPropagateBlockedRunFailureFromCommit() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + getBlocking( tx.runAsync( "RETURN 42 / 0" ) ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldPropagateRunFailureFromRollback() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + tx.runAsync( "RETURN ILLEGAL" ); + + try + { + getBlocking( tx.rollbackAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "ILLEGAL" ) ); + } + } + + @Test + public void shouldPropagateBlockedRunFailureFromRollback() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + getBlocking( tx.runAsync( "RETURN 42 / 0" ) ); + + try + { + getBlocking( tx.rollbackAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldPropagatePullAllFailureFromCommit() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + tx.runAsync( "UNWIND [1, 2, 3, 'Hi'] AS x RETURN 10 / x" ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "TypeError" ) ); + } + } + + @Test + public void shouldPropagateBlockedPullAllFailureFromCommit() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + getBlocking( tx.runAsync( "UNWIND [1, 2, 3, 'Hi'] AS x RETURN 10 / x" ) ); + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "TypeError" ) ); + } + } + + @Test + public void shouldPropagatePullAllFailureFromRollback() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + tx.runAsync( "UNWIND [1, 2, 3, 'Hi'] AS x RETURN 10 / x" ); + + try + { + getBlocking( tx.rollbackAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "TypeError" ) ); + } + } + + @Test + public void shouldPropagateBlockedPullAllFailureFromRollback() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + getBlocking( tx.runAsync( "UNWIND [1, 2, 3, 'Hi'] AS x RETURN 10 / x" ) ); + + try + { + getBlocking( tx.rollbackAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "TypeError" ) ); + } + } + + @Test + public void shouldFailToCommitWhenRunFailureIsConsumed() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + StatementResultCursor cursor = getBlocking( tx.runAsync( "RETURN Wrong" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), startsWith( "Transaction rolled back" ) ); + } + } + + @Test + public void shouldFailToCommitWhenPullAllFailureIsConsumed() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + StatementResultCursor cursor = getBlocking( tx.runAsync( + "FOREACH (value IN [1,2, 'aaa'] | CREATE (:Person {name: 10 / value}))" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "TypeError" ) ); + } + try + { + getBlocking( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), startsWith( "Transaction rolled back" ) ); + } + } + + @Test + public void shouldRollbackWhenRunFailureIsConsumed() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + StatementResultCursor cursor = getBlocking( tx.runAsync( "RETURN Wrong" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + assertNull( getBlocking( tx.rollbackAsync() ) ); + } + + @Test + public void shouldRollbackWhenPullAllFailureIsConsumed() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + StatementResultCursor cursor = getBlocking( tx.runAsync( "UNWIND [1, 0] AS x RETURN 5 / x" ) ); + + try + { + getBlocking( cursor.consumeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + + assertNull( getBlocking( tx.rollbackAsync() ) ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQuery.java b/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQuery.java index cc3bfe03e8..a6dc8f64d5 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQuery.java +++ b/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQuery.java @@ -24,11 +24,15 @@ import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResultCursor; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.Neo4jException; -import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.neo4j.driver.internal.util.Matchers.syntaxError; public class AsyncWrongQuery extends AbstractAsyncQuery { @@ -42,15 +46,19 @@ public CompletionStage execute( C context ) { Session session = newSession( AccessMode.READ, context ); - return session.runAsync( "RETURN" ).handle( ( cursor, error ) -> - { - session.closeAsync(); + return session.runAsync( "RETURN Wrong" ) + .thenCompose( StatementResultCursor::nextAsync ) + .handle( ( record, error ) -> + { + session.closeAsync(); + assertNull( record ); - assertNull( cursor ); - Throwable cause = Futures.completionErrorCause( error ); - assertThat( cause, is( syntaxError( "Unexpected end of input" ) ) ); + Throwable cause = Futures.completionErrorCause( error ); + assertNotNull( cause ); + assertThat( cause, instanceOf( ClientException.class ) ); + assertThat( ((Neo4jException) cause).code(), containsString( "SyntaxError" ) ); - return null; - } ); + return null; + } ); } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQueryInTx.java b/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQueryInTx.java index 917d6aa8b2..ae2ea4f04d 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQueryInTx.java +++ b/driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQueryInTx.java @@ -24,12 +24,16 @@ import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResultCursor; import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.Neo4jException; -import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.neo4j.driver.internal.util.Matchers.syntaxError; public class AsyncWrongQueryInTx extends AbstractAsyncQuery { @@ -44,14 +48,19 @@ public CompletionStage execute( C context ) Session session = newSession( AccessMode.READ, context ); return session.beginTransactionAsync() - .thenCompose( tx -> tx.runAsync( "RETURN" ).handle( ( cursor, error ) -> - { - assertNull( cursor ); - Throwable cause = Futures.completionErrorCause( error ); - assertThat( cause, is( syntaxError( "Unexpected end of input" ) ) ); - - return tx; - } ) ) + .thenCompose( tx -> tx.runAsync( "RETURN Wrong" ) + .thenCompose( StatementResultCursor::nextAsync ) + .handle( ( record, error ) -> + { + assertNull( record ); + + Throwable cause = Futures.completionErrorCause( error ); + assertNotNull( cause ); + assertThat( cause, instanceOf( ClientException.class ) ); + assertThat( ((Neo4jException) cause).code(), containsString( "SyntaxError" ) ); + + return tx; + } ) ) .thenCompose( Transaction::rollbackAsync ) .whenComplete( ( ignore, error ) -> session.closeAsync() ); } From c2ea2574e20265bbf20ede9276ed23316999da00 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 18 Oct 2017 14:56:02 +0200 Subject: [PATCH 4/7] Propagate failures when summary is requested Calls `StatementResult#summary()` and `StatementResultCursor#summaryAsync()` should propagate unconsumed query errors. They are in this sense same as `#consume()` and `#consumeAsync()` respectively. Previously summary calls simply ignored existing unconsumed failure. This commit makes them propagate it instead. Note that it's later still possible to access summary after failure is consumed. --- .../handlers/PullAllResponseHandler.java | 33 +++++++++++-------- .../driver/v1/integration/SessionAsyncIT.java | 18 ++++++++++ .../driver/v1/integration/SessionIT.java | 21 ++++++++++++ .../v1/integration/TransactionAsyncIT.java | 23 +++++++++++-- .../driver/v1/integration/TransactionIT.java | 25 ++++++++++++-- 5 files changed, 103 insertions(+), 17 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index 6a15353b73..68fd1bd386 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -142,9 +142,7 @@ public synchronized CompletionStage peekAsync() { if ( failure != null ) { - Throwable error = failure; - failure = null; // propagate failure only once - return failedFuture( error ); + return failedFuture( extractFailure() ); } if ( finished ) @@ -166,17 +164,16 @@ public synchronized CompletionStage peekAsync() public synchronized CompletionStage nextAsync() { - return peekAsync().thenApply( record -> - { - dequeueRecord(); - return record; - } ); + return peekAsync().thenApply( ignore -> dequeueRecord() ); } - // todo: propagate failure from here as well public synchronized CompletionStage summaryAsync() { - if ( summary != null ) + if ( failure != null ) + { + return failedFuture( extractFailure() ); + } + else if ( summary != null ) { return completedFuture( summary ); } @@ -194,9 +191,7 @@ public synchronized CompletionStage failureAsync() { if ( failure != null ) { - Throwable error = failure; - failure = null; // propagate failure only once - return completedFuture( error ); + return completedFuture( extractFailure() ); } else if ( finished ) { @@ -237,6 +232,18 @@ private Record dequeueRecord() return record; } + private Throwable extractFailure() + { + if ( failure == null ) + { + throw new IllegalStateException( "Can't consume failure because it does not exist" ); + } + + Throwable error = failure; + failure = null; // propagate failure only once + return error; + } + private void completeRecordFuture( Record record ) { if ( recordFuture != null ) diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index 29ba15f1f8..8889eba1cd 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -949,6 +949,24 @@ public void shouldBePossibleToConsumeResultAfterSessionIsClosed() assertEquals( 20000, ints.size() ); } + @Test + public void shouldPropagateFailureFromSummary() + { + StatementResultCursor cursor = getBlocking( session.runAsync( "RETURN Something" ) ); + + try + { + getBlocking( cursor.summaryAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + assertNotNull( getBlocking( cursor.summaryAsync() ) ); + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { CompletableFuture>> resultFuture = new CompletableFuture<>(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 0d50874911..3626e0f03f 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -1282,6 +1282,27 @@ public void shouldBePossibleToConsumeResultAfterSessionIsClosed() assertEquals( 20000, ints.size() ); } + @Test + public void shouldPropagateFailureFromSummary() + { + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "RETURN Wrong" ); + + try + { + result.summary(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + assertNotNull( result.summary() ); + } + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index d6e55a60ba..d7c65e8ece 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -323,8 +323,7 @@ public void shouldAllowRollbackAfterCoupleCorrectAndSingleWrongStatement() StatementResultCursor cursor3 = await( tx.runAsync( "RETURN" ) ); try { - // todo: use summaryAsync() - await( cursor3.consumeAsync() ); + await( cursor3.summaryAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -1219,6 +1218,26 @@ public void shouldRollbackWhenPullAllFailureIsConsumed() assertNull( getBlocking( tx.rollbackAsync() ) ); } + @Test + public void shouldPropagateFailureFromSummary() + { + Transaction tx = getBlocking( session.beginTransactionAsync() ); + + StatementResultCursor cursor = getBlocking( tx.runAsync( "RETURN Wrong" ) ); + + try + { + getBlocking( cursor.summaryAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + assertNotNull( getBlocking( cursor.summaryAsync() ) ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java index f058485b3a..4f60697008 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.v1.integration; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -35,9 +34,11 @@ import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.util.TestNeo4jSession; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -357,9 +358,29 @@ public void shouldRollBackTxIfErrorWithConsume() throws Throwable StatementResult cursor = tx.run( "RETURN 1" ); int val = cursor.single().get( "1" ).asInt(); - Assert.assertThat( val, equalTo( 1 ) ); + assertThat( val, equalTo( 1 ) ); } } + } + + @Test + public void shouldPropagateFailureFromSummary() + { + try ( Transaction tx = session.beginTransaction() ) + { + StatementResult result = tx.run( "RETURN Wrong" ); + try + { + result.summary(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + + assertNotNull( result.summary() ); + } } } From 20dae996e3004901907dadefe134a2e80cf57538 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 18 Oct 2017 23:14:16 +0200 Subject: [PATCH 5/7] Improve error propagation When closing sessions and committing/rolling back transactions. Previously only last async cursor failure was propagated. This means successfully completed cursors could hide not consumed failures from previous queries. This commit makes session and transaction track all results and query them for not consumed errors when session is closed or transaction either committed or rolled back. --- .../driver/internal/ExplicitTransaction.java | 32 ++--- .../neo4j/driver/internal/NetworkSession.java | 22 +-- .../internal/async/ResultCursorsHolder.java | 64 +++++++++ .../async/ResultCursorsHolderTest.java | 134 ++++++++++++++++++ .../driver/v1/integration/SessionAsyncIT.java | 21 +++ .../driver/v1/integration/SessionIT.java | 43 ++++++ 6 files changed, 281 insertions(+), 35 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index f946d4adc7..53222d1f55 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -18,8 +18,6 @@ */ package org.neo4j.driver.internal; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -29,6 +27,7 @@ import org.neo4j.driver.internal.async.InternalStatementResultCursor; import org.neo4j.driver.internal.async.QueryRunner; +import org.neo4j.driver.internal.async.ResultCursorsHolder; import org.neo4j.driver.internal.handlers.BeginTxResponseHandler; import org.neo4j.driver.internal.handlers.CommitTxResponseHandler; import org.neo4j.driver.internal.handlers.NoOpResponseHandler; @@ -36,7 +35,6 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.types.InternalTypeSystem; -import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; @@ -93,8 +91,8 @@ private enum State private final Connection connection; private final NetworkSession session; + private final ResultCursorsHolder resultCursors; - private final List> resultCursors = new ArrayList<>(); private volatile Bookmark bookmark = Bookmark.empty(); private volatile State state = State.ACTIVE; @@ -102,6 +100,7 @@ public ExplicitTransaction( Connection connection, NetworkSession session ) { this.connection = connection; this.session = session; + this.resultCursors = new ResultCursorsHolder(); } public CompletionStage beginAsync( Bookmark initialBookmark ) @@ -178,8 +177,8 @@ else if ( state == State.TERMINATED ) } else { - return receiveFailures() - .thenCompose( failure -> doCommitAsync().handle( handleCommitOrRollback( failure ) ) ) + return resultCursors.retrieveNotConsumedError() + .thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) ) .whenComplete( transactionClosed( State.COMMITTED ) ); } } @@ -203,8 +202,8 @@ else if ( state == State.TERMINATED ) } else { - return receiveFailures() - .thenCompose( failure -> doRollbackAsync().handle( handleCommitOrRollback( failure ) ) ) + return resultCursors.retrieveNotConsumedError() + .thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) ) .whenComplete( transactionClosed( State.ROLLED_BACK ) ); } } @@ -279,17 +278,17 @@ public CompletionStage runAsync( Statement statement ) private CompletionStage run( Statement statement, boolean asAsync ) { ensureCanRunQueries(); - CompletionStage result; + CompletionStage cursorStage; if ( asAsync ) { - result = QueryRunner.runAsAsync( connection, statement, this ); + cursorStage = QueryRunner.runAsAsync( connection, statement, this ); } else { - result = QueryRunner.runAsBlocking( connection, statement, this ); + cursorStage = QueryRunner.runAsBlocking( connection, statement, this ); } - resultCursors.add( result ); - return result; + resultCursors.add( cursorStage ); + return cursorStage; } private void ensureCanRunQueries() @@ -391,11 +390,4 @@ private BiConsumer transactionClosed( State newState ) session.setBookmark( bookmark ); }; } - - private CompletionStage receiveFailures() - { - return resultCursors.stream() - .map( stage -> stage.thenCompose( InternalStatementResultCursor::failureAsync ) ) - .reduce( completedFuture( null ), Futures::firstNotNull ); - } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index d8f0efcb28..e61ce516fe 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -26,6 +26,7 @@ import org.neo4j.driver.internal.async.InternalStatementResultCursor; import org.neo4j.driver.internal.async.QueryRunner; +import org.neo4j.driver.internal.async.ResultCursorsHolder; import org.neo4j.driver.internal.logging.DelegatingLogger; import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.spi.Connection; @@ -59,12 +60,12 @@ public class NetworkSession implements Session private final ConnectionProvider connectionProvider; private final AccessMode mode; private final RetryLogic retryLogic; + private final ResultCursorsHolder resultCursors; protected final Logger logger; private volatile Bookmark bookmark = Bookmark.empty(); private volatile CompletionStage transactionStage = completedFuture( null ); private volatile CompletionStage connectionStage = completedFuture( null ); - private volatile CompletionStage lastResultStage = completedFuture( null ); private final AtomicBoolean open = new AtomicBoolean( true ); @@ -74,6 +75,7 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R this.connectionProvider = connectionProvider; this.mode = mode; this.retryLogic = retryLogic; + this.resultCursors = new ResultCursorsHolder(); this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) ); } @@ -161,9 +163,7 @@ public CompletionStage closeAsync() { if ( open.compareAndSet( true, false ) ) { - return lastResultStage - .exceptionally( error -> null ) // ignore connection acquisition failures - .thenCompose( this::receiveFailure ) + return resultCursors.retrieveNotConsumedError() .thenCompose( error -> releaseResources().thenApply( ignore -> { Throwable queryError = Futures.completionErrorCause( error ); @@ -183,15 +183,6 @@ public CompletionStage closeAsync() return completedFuture( null ); } - private CompletionStage receiveFailure( InternalStatementResultCursor cursor ) - { - if ( cursor == null ) - { - return completedFuture( null ); - } - return cursor.failureAsync(); - } - @Override public Transaction beginTransaction() { @@ -421,7 +412,7 @@ private CompletionStage runAsync( Statement state { ensureSessionIsOpen(); - lastResultStage = ensureNoOpenTxBeforeRunningQuery() + CompletionStage cursorStage = ensureNoOpenTxBeforeRunningQuery() .thenCompose( ignore -> acquireConnection( mode ) ) .thenCompose( connection -> { @@ -435,7 +426,8 @@ private CompletionStage runAsync( Statement state } } ); - return lastResultStage; + resultCursors.add( cursorStage ); + return cursorStage; } private CompletionStage beginTransactionAsync( AccessMode mode ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java new file mode 100644 index 0000000000..819d7f56e2 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletionStage; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +public class ResultCursorsHolder +{ + private final List> cursorStages = new ArrayList<>(); + + public void add( CompletionStage cursorStage ) + { + Objects.requireNonNull( cursorStage ); + cursorStages.add( cursorStage ); + } + + public CompletionStage retrieveNotConsumedError() + { + return cursorStages.stream() + .map( this::retrieveFailure ) + .reduce( completedFuture( null ), this::nonNullFailureFromEither ); + } + + private CompletionStage retrieveFailure( CompletionStage cursorStage ) + { + return cursorStage + .exceptionally( cursor -> null ) + .thenCompose( cursor -> cursor == null ? completedFuture( null ) : cursor.failureAsync() ); + } + + private CompletionStage nonNullFailureFromEither( CompletionStage stage1, + CompletionStage stage2 ) + { + return stage1.thenCompose( value -> + { + if ( value != null ) + { + return completedFuture( value ); + } + return stage2; + } ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java new file mode 100644 index 0000000000..ade998a6f6 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeoutException; + +import org.neo4j.driver.internal.util.Futures; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.util.Futures.getBlocking; + +public class ResultCursorsHolderTest +{ + @Test + public void shouldReturnNoErrorWhenNoCursorStages() + { + ResultCursorsHolder holder = new ResultCursorsHolder(); + + Throwable error = getBlocking( holder.retrieveNotConsumedError() ); + assertNull( error ); + } + + @Test + public void shouldFailToAddNullCursorStage() + { + ResultCursorsHolder holder = new ResultCursorsHolder(); + + try + { + holder.add( null ); + fail( "Exception expected" ); + } + catch ( NullPointerException e ) + { + // expected + } + } + + @Test + public void shouldReturnNoErrorWhenCursorStagesHaveNoErrors() + { + ResultCursorsHolder holder = new ResultCursorsHolder(); + + holder.add( cursorWithoutError() ); + holder.add( cursorWithoutError() ); + holder.add( cursorWithoutError() ); + holder.add( cursorWithoutError() ); + + Throwable error = getBlocking( holder.retrieveNotConsumedError() ); + assertNull( error ); + } + + @Test + public void shouldNotReturnStageErrors() + { + ResultCursorsHolder holder = new ResultCursorsHolder(); + + holder.add( Futures.failedFuture( new RuntimeException( "Failed to acquire a connection" ) ) ); + holder.add( cursorWithoutError() ); + holder.add( cursorWithoutError() ); + holder.add( Futures.failedFuture( new IOException( "Failed to do IO" ) ) ); + + Throwable error = getBlocking( holder.retrieveNotConsumedError() ); + assertNull( error ); + } + + @Test + public void shouldReturnErrorWhenOneCursorFailed() + { + IOException error = new IOException( "IO failed" ); + ResultCursorsHolder holder = new ResultCursorsHolder(); + + holder.add( cursorWithoutError() ); + holder.add( cursorWithoutError() ); + holder.add( cursorWithError( error ) ); + holder.add( cursorWithoutError() ); + + Throwable retrievedError = getBlocking( holder.retrieveNotConsumedError() ); + assertEquals( error, retrievedError ); + } + + @Test + public void shouldReturnFirstError() + { + RuntimeException error1 = new RuntimeException( "Error 1" ); + IOException error2 = new IOException( "Error 2" ); + TimeoutException error3 = new TimeoutException( "Error 3" ); + ResultCursorsHolder holder = new ResultCursorsHolder(); + + holder.add( cursorWithoutError() ); + holder.add( cursorWithError( error1 ) ); + holder.add( cursorWithError( error2 ) ); + holder.add( cursorWithError( error3 ) ); + + assertEquals( error1, getBlocking( holder.retrieveNotConsumedError() ) ); + } + + private CompletionStage cursorWithoutError() + { + return cursorWithError( null ); + } + + private CompletionStage cursorWithError( Throwable error ) + { + InternalStatementResultCursor cursor = mock( InternalStatementResultCursor.class ); + when( cursor.failureAsync() ).thenReturn( completedFuture( error ) ); + return completedFuture( cursor ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index 8889eba1cd..ca8e90d4b9 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -967,6 +967,27 @@ public void shouldPropagateFailureFromSummary() assertNotNull( getBlocking( cursor.summaryAsync() ) ); } + @Test + public void shouldPropagateFailureInCloseFromPreviousRun() + { + session.runAsync( "CREATE ()" ); + session.runAsync( "CREATE ()" ); + session.runAsync( "CREATE ()" ); + session.runAsync( "RETURN invalid" ); + session.runAsync( "CREATE ()" ); + session.runAsync( "CREATE ()" ); + + try + { + getBlocking( session.closeAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.code(), containsString( "SyntaxError" ) ); + } + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { CompletableFuture>> resultFuture = new CompletableFuture<>(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 3626e0f03f..4d259f62c9 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -1303,6 +1303,49 @@ public void shouldPropagateFailureFromSummary() } } + @Test + public void shouldThrowFromCloseWhenPreviousErrorNotConsumed() + { + Session session = neo4j.driver().session(); + + session.run( "CREATE ()" ); + session.run( "CREATE ()" ); + session.run( "RETURN 10 / 0" ); + session.run( "CREATE ()" ); + + try + { + session.close(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + } + + @Test + public void shouldCloseCleanlyWhenRunErrorConsumed() + { + Session session = neo4j.driver().session(); + + session.run( "CREATE ()" ); + + try + { + session.run( "RETURN 10 / 0" ).consume(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), containsString( "/ by zero" ) ); + } + session.run( "CREATE ()" ); + + session.close(); + assertFalse( session.isOpen() ); + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); From ab35d362683e26302641949c7909842b2816bf51 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 18 Oct 2017 23:38:51 +0200 Subject: [PATCH 6/7] Small cleanup in SessionIT Replaced anonymous classes with lambdas, made some tests use existing driver instead of creating a new one. --- .../driver/v1/integration/SessionIT.java | 478 +++++++----------- 1 file changed, 183 insertions(+), 295 deletions(-) diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 4d259f62c9..430f33bedb 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -120,8 +120,7 @@ public void tearDown() public void shouldKnowSessionIsClosed() throws Throwable { // Given - driver = newDriver(); - Session session = driver.session(); + Session session = neo4j.driver().session(); // When session.close(); @@ -162,13 +161,11 @@ public void shouldKillLongRunningStatement() throws Throwable { neo4j.ensureProcedures( "longRunningStatement.jar" ); // Given - driver = newDriver(); - int executionTimeout = 10; // 10s final int killTimeout = 1; // 1s long startTime = -1, endTime; - try ( Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { StatementResult result = session.run( "CALL test.driver.longRunningStatement({seconds})", @@ -200,14 +197,12 @@ public void shouldKillLongStreamingResult() throws Throwable { neo4j.ensureProcedures( "longRunningStatement.jar" ); // Given - driver = newDriver(); - int executionTimeout = 10; // 10s final int killTimeout = 1; // 1s long startTime = -1, endTime; int recordCount = 0; - try ( final Session session = driver.session() ) + try ( final Session session = neo4j.driver().session() ) { StatementResult result = session.run( "CALL test.driver.longStreamingResult({seconds})", parameters( "seconds", executionTimeout ) ); @@ -242,9 +237,8 @@ public void shouldAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable { // Given neo4j.ensureProcedures( "longRunningStatement.jar" ); - driver = newDriver(); - try ( Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { Transaction tx1 = session.beginTransaction(); @@ -273,9 +267,8 @@ public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Thro { // Given neo4j.ensureProcedures( "longRunningStatement.jar" ); - driver = newDriver(); - Session session = driver.session(); + Session session = neo4j.driver().session(); session.run( "CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); Thread.sleep( 1000 ); @@ -294,9 +287,8 @@ public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable { // Given neo4j.ensureProcedures( "longRunningStatement.jar" ); - driver = newDriver(); - try ( Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { Transaction tx = session.beginTransaction(); @@ -335,23 +327,19 @@ public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable @SuppressWarnings( "deprecation" ) private void resetSessionAfterTimeout( final Session session, final int timeout ) { - new Thread( new Runnable() + new Thread( () -> { - @Override - public void run() + try { - try - { - Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds - } - catch ( InterruptedException e ) - { - e.printStackTrace(); - } - finally - { - session.reset(); // reset the session after timeout - } + Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds + } + catch ( InterruptedException e ) + { + e.printStackTrace(); + } + finally + { + session.reset(); // reset the session after timeout } } ).start(); } @@ -361,8 +349,7 @@ public void run() public void shouldAllowMoreStatementAfterSessionReset() { // Given - try ( Driver driver = newDriver(); - Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { session.run( "Return 1" ).consume(); @@ -380,8 +367,7 @@ public void shouldAllowMoreStatementAfterSessionReset() public void shouldAllowMoreTxAfterSessionReset() { // Given - try ( Driver driver = newDriver(); - Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { try ( Transaction tx = session.beginTransaction() ) { @@ -406,8 +392,7 @@ public void shouldAllowMoreTxAfterSessionReset() public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() { // Given - try ( Driver driver = newDriver(); - Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { try ( Transaction tx = session.beginTransaction() ) { @@ -429,8 +414,7 @@ public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() public void shouldAllowMoreTxAfterSessionResetInTx() { // Given - try ( Driver driver = newDriver(); - Session session = driver.session() ) + try ( Session session = neo4j.driver().session() ) { try ( Transaction tx = session.beginTransaction() ) { @@ -653,14 +637,7 @@ public void readTxCommittedWithoutTxSuccess() assumeBookmarkSupport( driver ); assertNull( session.lastBookmark() ); - long answer = session.readTransaction( new TransactionWork() - { - @Override - public Long execute( Transaction tx ) - { - return tx.run( "RETURN 42" ).single().get( 0 ).asLong(); - } - } ); + long answer = session.readTransaction( tx -> tx.run( "RETURN 42" ).single().get( 0 ).asLong() ); assertEquals( 42, answer ); // bookmark should be not-null after commit @@ -675,14 +652,8 @@ public void writeTxCommittedWithoutTxSuccess() { try ( Session session = driver.session() ) { - long answer = session.writeTransaction( new TransactionWork() - { - @Override - public Long execute( Transaction tx ) - { - return tx.run( "CREATE (:Person {name: 'Thor Odinson'}) RETURN 42" ).single().get( 0 ).asLong(); - } - } ); + long answer = session.writeTransaction( tx -> + tx.run( "CREATE (:Person {name: 'Thor Odinson'}) RETURN 42" ).single().get( 0 ).asLong() ); assertEquals( 42, answer ); } @@ -703,15 +674,11 @@ public void readTxRolledBackWithTxFailure() assumeBookmarkSupport( driver ); assertNull( session.lastBookmark() ); - long answer = session.readTransaction( new TransactionWork() + long answer = session.readTransaction( tx -> { - @Override - public Long execute( Transaction tx ) - { - StatementResult result = tx.run( "RETURN 42" ); - tx.failure(); - return result.single().get( 0 ).asLong(); - } + StatementResult result = tx.run( "RETURN 42" ); + tx.failure(); + return result.single().get( 0 ).asLong(); } ); assertEquals( 42, answer ); @@ -727,15 +694,11 @@ public void writeTxRolledBackWithTxFailure() { try ( Session session = driver.session() ) { - int answer = session.writeTransaction( new TransactionWork() + int answer = session.writeTransaction( tx -> { - @Override - public Integer execute( Transaction tx ) - { - tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); - tx.failure(); - return 42; - } + tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); + tx.failure(); + return 42; } ); assertEquals( 42, answer ); @@ -760,18 +723,14 @@ public void readTxRolledBackWhenExceptionIsThrown() try { - session.readTransaction( new TransactionWork() + session.readTransaction( tx -> { - @Override - public Long execute( Transaction tx ) + StatementResult result = tx.run( "RETURN 42" ); + if ( result.single().get( 0 ).asLong() == 42 ) { - StatementResult result = tx.run( "RETURN 42" ); - if ( result.single().get( 0 ).asLong() == 42 ) - { - throw new IllegalStateException(); - } - return 1L; + throw new IllegalStateException(); } + return 1L; } ); fail( "Exception expected" ); } @@ -794,14 +753,10 @@ public void writeTxRolledBackWhenExceptionIsThrown() { try { - session.writeTransaction( new TransactionWork() + session.writeTransaction( tx -> { - @Override - public Integer execute( Transaction tx ) - { - tx.run( "CREATE (:Person {name: 'Loki Odinson'})" ); - throw new IllegalStateException(); - } + tx.run( "CREATE (:Person {name: 'Loki Odinson'})" ); + throw new IllegalStateException(); } ); fail( "Exception expected" ); } @@ -828,16 +783,12 @@ public void readTxRolledBackWhenMarkedBothSuccessAndFailure() assumeBookmarkSupport( driver ); assertNull( session.lastBookmark() ); - long answer = session.readTransaction( new TransactionWork() + long answer = session.readTransaction( tx -> { - @Override - public Long execute( Transaction tx ) - { - StatementResult result = tx.run( "RETURN 42" ); - tx.success(); - tx.failure(); - return result.single().get( 0 ).asLong(); - } + StatementResult result = tx.run( "RETURN 42" ); + tx.success(); + tx.failure(); + return result.single().get( 0 ).asLong(); } ); assertEquals( 42, answer ); @@ -853,16 +804,12 @@ public void writeTxRolledBackWhenMarkedBothSuccessAndFailure() { try ( Session session = driver.session() ) { - int answer = session.writeTransaction( new TransactionWork() + int answer = session.writeTransaction( tx -> { - @Override - public Integer execute( Transaction tx ) - { - tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); - tx.success(); - tx.failure(); - return 42; - } + tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); + tx.success(); + tx.failure(); + return 42; } ); assertEquals( 42, answer ); @@ -887,15 +834,11 @@ public void readTxRolledBackWhenMarkedAsSuccessAndThrowsException() try { - session.readTransaction( new TransactionWork() + session.readTransaction( tx -> { - @Override - public Long execute( Transaction tx ) - { - tx.run( "RETURN 42" ); - tx.success(); - throw new IllegalStateException(); - } + tx.run( "RETURN 42" ); + tx.success(); + throw new IllegalStateException(); } ); fail( "Exception expected" ); } @@ -918,15 +861,11 @@ public void writeTxRolledBackWhenMarkedAsSuccessAndThrowsException() { try { - session.writeTransaction( new TransactionWork() + session.writeTransaction( tx -> { - @Override - public Integer execute( Transaction tx ) - { - tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); - tx.success(); - throw new IllegalStateException(); - } + tx.run( "CREATE (:Person {name: 'Natasha Romanoff'})" ); + tx.success(); + throw new IllegalStateException(); } ); fail( "Exception expected" ); } @@ -996,7 +935,7 @@ public void resetShouldStopWriteTransactionWaitingForALock() throws Exception testResetOfQueryWaitingForLock( new NodeIdUpdater() { @Override - public void performUpdate( Driver driver, final int nodeId, final int newNodeId, + public void performUpdate( Driver driver, int nodeId, int newNodeId, AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception { try ( Session session = driver.session() ) @@ -1004,16 +943,12 @@ public void performUpdate( Driver driver, final int nodeId, final int newNodeId, usedSessionRef.set( session ); latchToWait.await(); - session.writeTransaction( new TransactionWork() + session.writeTransaction( tx -> { - @Override - public Void execute( Transaction tx ) - { - invocationsOfWork.incrementAndGet(); - StatementResult result = updateNodeId( tx, nodeId, newNodeId ); - result.consume(); - return null; - } + invocationsOfWork.incrementAndGet(); + StatementResult result = updateNodeId( tx, nodeId, newNodeId ); + result.consume(); + return null; } ); } } @@ -1036,65 +971,54 @@ public void transactionRunShouldFailOnDeadlocks() throws Exception final CountDownLatch latch1 = new CountDownLatch( 1 ); final CountDownLatch latch2 = new CountDownLatch( 1 ); - try ( final Driver driver = newDriver() ) + Future result1 = executeInDifferentThread( () -> { - Future result1 = executeInDifferentThread( new Callable() + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) { - @Override - public Void call() throws Exception - { - try ( Session session = driver.session(); - Transaction tx = session.beginTransaction() ) - { - // lock first node - updateNodeId( tx, nodeId1, newNodeId1 ).consume(); + // lock first node + updateNodeId( tx, nodeId1, newNodeId1 ).consume(); - latch1.await(); - latch2.countDown(); + latch1.await(); + latch2.countDown(); - // lock second node - updateNodeId( tx, nodeId2, newNodeId1 ).consume(); + // lock second node + updateNodeId( tx, nodeId2, newNodeId1 ).consume(); - tx.success(); - } - return null; - } - } ); + tx.success(); + } + return null; + } ); - Future result2 = executeInDifferentThread( new Callable() + Future result2 = executeInDifferentThread( () -> + { + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) { - @Override - public Void call() throws Exception - { - try ( Session session = driver.session(); - Transaction tx = session.beginTransaction() ) - { - // lock second node - updateNodeId( tx, nodeId2, newNodeId2 ).consume(); + // lock second node + updateNodeId( tx, nodeId2, newNodeId2 ).consume(); - latch1.countDown(); - latch2.await(); + latch1.countDown(); + latch2.await(); - // lock first node - updateNodeId( tx, nodeId1, newNodeId2 ).consume(); - - tx.success(); - } - return null; - } - } ); + // lock first node + updateNodeId( tx, nodeId1, newNodeId2 ).consume(); - boolean firstResultFailed = assertOneOfTwoFuturesFailWithDeadlock( result1, result2 ); - if ( firstResultFailed ) - { - assertEquals( 0, countNodesWithId( newNodeId1 ) ); - assertEquals( 2, countNodesWithId( newNodeId2 ) ); - } - else - { - assertEquals( 2, countNodesWithId( newNodeId1 ) ); - assertEquals( 0, countNodesWithId( newNodeId2 ) ); + tx.success(); } + return null; + } ); + + boolean firstResultFailed = assertOneOfTwoFuturesFailWithDeadlock( result1, result2 ); + if ( firstResultFailed ) + { + assertEquals( 0, countNodesWithId( newNodeId1 ) ); + assertEquals( 2, countNodesWithId( newNodeId2 ) ); + } + else + { + assertEquals( 2, countNodesWithId( newNodeId1 ) ); + assertEquals( 0, countNodesWithId( newNodeId2 ) ); } } @@ -1113,95 +1037,80 @@ public void writeTransactionFunctionShouldRetryDeadlocks() throws Exception final CountDownLatch latch1 = new CountDownLatch( 1 ); final CountDownLatch latch2 = new CountDownLatch( 1 ); - try ( final Driver driver = newDriver() ) + Future result1 = executeInDifferentThread( () -> { - Future result1 = executeInDifferentThread( new Callable() + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) { - @Override - public Void call() throws Exception - { - try ( Session session = driver.session(); - Transaction tx = session.beginTransaction() ) - { - // lock first node - updateNodeId( tx, nodeId1, newNodeId1 ).consume(); + // lock first node + updateNodeId( tx, nodeId1, newNodeId1 ).consume(); - latch1.await(); - latch2.countDown(); + latch1.await(); + latch2.countDown(); - // lock second node - updateNodeId( tx, nodeId2, newNodeId1 ).consume(); + // lock second node + updateNodeId( tx, nodeId2, newNodeId1 ).consume(); - tx.success(); - } - return null; - } - } ); + tx.success(); + } + return null; + } ); - Future result2 = executeInDifferentThread( new Callable() + Future result2 = executeInDifferentThread( () -> + { + try ( Session session = neo4j.driver().session() ) { - @Override - public Void call() throws Exception + session.writeTransaction( tx -> { - try ( Session session = driver.session() ) - { - session.writeTransaction( new TransactionWork() - { - @Override - public Void execute( Transaction tx ) - { - // lock second node - updateNodeId( tx, nodeId2, newNodeId2 ).consume(); + // lock second node + updateNodeId( tx, nodeId2, newNodeId2 ).consume(); - latch1.countDown(); - await( latch2 ); + latch1.countDown(); + await( latch2 ); - // lock first node - updateNodeId( tx, nodeId1, newNodeId2 ).consume(); + // lock first node + updateNodeId( tx, nodeId1, newNodeId2 ).consume(); - createNodeWithId( nodeId3 ); + createNodeWithId( nodeId3 ); - return null; - } - } ); - } return null; - } - } ); - - boolean firstResultFailed = false; - try - { - // first future may: - // 1) succeed, when it's tx was able to grab both locks and tx in other future was - // terminated because of a deadlock - // 2) fail, when it's tx was terminated because of a deadlock - assertNull( result1.get( 20, TimeUnit.SECONDS ) ); - } - catch ( ExecutionException e ) - { - firstResultFailed = true; + } ); } + return null; + } ); + + boolean firstResultFailed = false; + try + { + // first future may: + // 1) succeed, when it's tx was able to grab both locks and tx in other future was + // terminated because of a deadlock + // 2) fail, when it's tx was terminated because of a deadlock + assertNull( result1.get( 20, TimeUnit.SECONDS ) ); + } + catch ( ExecutionException e ) + { + firstResultFailed = true; + } - // second future can't fail because deadlocks are retried - assertNull( result2.get( 20, TimeUnit.SECONDS ) ); + // second future can't fail because deadlocks are retried + assertNull( result2.get( 20, TimeUnit.SECONDS ) ); - if ( firstResultFailed ) - { - // tx with retries was successful and updated ids - assertEquals( 0, countNodesWithId( newNodeId1 ) ); - assertEquals( 2, countNodesWithId( newNodeId2 ) ); - } - else - { - // tx without retries was successful and updated ids - // tx with retries did not manage to find nodes because their ids were updated - assertEquals( 2, countNodesWithId( newNodeId1 ) ); - assertEquals( 0, countNodesWithId( newNodeId2 ) ); - } - // tx with retries was successful and created an additional node - assertEquals( 1, countNodesWithId( nodeId3 ) ); + if ( firstResultFailed ) + { + // tx with retries was successful and updated ids + assertEquals( 0, countNodesWithId( newNodeId1 ) ); + assertEquals( 2, countNodesWithId( newNodeId2 ) ); } + else + { + // tx without retries was successful and updated ids + // tx with retries did not manage to find nodes because their ids were updated + assertEquals( 2, countNodesWithId( newNodeId1 ) ); + assertEquals( 0, countNodesWithId( newNodeId2 ) ); + } + // tx with retries was successful and created an additional node + assertEquals( 1, countNodesWithId( nodeId3 ) ); } @Test @@ -1367,19 +1276,15 @@ private void testExecuteReadTx( AccessMode sessionMode ) // read previously committed data try ( Session session = driver.session( sessionMode ) ) { - Set names = session.readTransaction( new TransactionWork>() + Set names = session.readTransaction( tx -> { - @Override - public Set execute( Transaction tx ) + List records = tx.run( "MATCH (p:Person) RETURN p.name AS name" ).list(); + Set names1 = new HashSet<>( records.size() ); + for ( Record record : records ) { - List records = tx.run( "MATCH (p:Person) RETURN p.name AS name" ).list(); - Set names = new HashSet<>( records.size() ); - for ( Record record : records ) - { - names.add( record.get( "name" ).asString() ); - } - return names; + names1.add( record.get( "name" ).asString() ); } + return names1; } ); assertThat( names, containsInAnyOrder( "Tony Stark", "Steve Rogers" ) ); @@ -1393,16 +1298,12 @@ private void testExecuteWriteTx( AccessMode sessionMode ) // write some test data try ( Session session = driver.session( sessionMode ) ) { - String material = session.writeTransaction( new TransactionWork() + String material = session.writeTransaction( tx -> { - @Override - public String execute( Transaction tx ) - { - StatementResult result = tx.run( "CREATE (s:Shield {material: 'Vibranium'}) RETURN s" ); - tx.success(); - Record record = result.single(); - return record.get( 0 ).asNode().get( "material" ).asString(); - } + StatementResult result = tx.run( "CREATE (s:Shield {material: 'Vibranium'}) RETURN s" ); + tx.success(); + Record record = result.single(); + return record.get( 0 ).asNode().get( "material" ).asString(); } ); assertEquals( "Vibranium", material ); @@ -1424,17 +1325,13 @@ private void testTxRollbackWhenFunctionThrows( AccessMode sessionMode ) { try { - session.writeTransaction( new TransactionWork() + session.writeTransaction( tx -> { - @Override - public Void execute( Transaction tx ) - { - tx.run( "CREATE (:Person {name: 'Thanos'})" ); - // trigger division by zero error: - tx.run( "UNWIND range(0, 1) AS i RETURN 10/i" ); - tx.success(); - return null; - } + tx.run( "CREATE (:Person {name: 'Thanos'})" ); + // trigger division by zero error: + tx.run( "UNWIND range(0, 1) AS i RETURN 10/i" ); + tx.success(); + return null; } ); fail( "Exception expected" ); } @@ -1450,6 +1347,7 @@ public Void execute( Transaction tx ) Record record = session.run( "MATCH (p:Person {name: 'Thanos'}) RETURN count(p)" ).single(); assertEquals( 0, record.get( 0 ).asInt() ); } + } @SuppressWarnings( "deprecation" ) @@ -1464,11 +1362,10 @@ private void testResetOfQueryWaitingForLock( NodeIdUpdater nodeIdUpdater ) throw CountDownLatch nodeLocked = new CountDownLatch( 1 ); AtomicReference otherSessionRef = new AtomicReference<>(); - try ( Driver driver = newDriver(); - Session session = driver.session(); + try ( Session session = neo4j.driver().session(); Transaction tx = session.beginTransaction() ) { - Future txResult = nodeIdUpdater.update( driver, nodeId, newNodeId1, otherSessionRef, nodeLocked ); + Future txResult = nodeIdUpdater.update( nodeId, newNodeId1, otherSessionRef, nodeLocked ); StatementResult result = updateNodeId( tx, nodeId, newNodeId2 ); result.consume(); @@ -1503,11 +1400,6 @@ private Driver newDriverWithFixedRetries( int maxRetriesCount ) return driverFactory.newInstance( neo4j.uri(), auth, routingConf, RetrySettings.DEFAULT, noLoggingConfig() ); } - private Driver newDriver() - { - return GraphDatabase.driver( neo4j.uri(), neo4j.authToken(), noLoggingConfig() ); - } - private Driver newDriverWithLimitedRetries( int maxTxRetryTime, TimeUnit unit ) { Config config = Config.build() @@ -1628,17 +1520,13 @@ private static void await( CountDownLatch latch ) private abstract class NodeIdUpdater { - final Future update( final Driver driver, final int nodeId, final int newNodeId, - final AtomicReference usedSessionRef, final CountDownLatch latchToWait ) + final Future update( int nodeId, int newNodeId, AtomicReference usedSessionRef, + CountDownLatch latchToWait ) { - return executeInDifferentThread( new Callable() + return executeInDifferentThread( () -> { - @Override - public Void call() throws Exception - { - performUpdate( driver, nodeId, newNodeId, usedSessionRef, latchToWait ); - return null; - } + performUpdate( neo4j.driver(), nodeId, newNodeId, usedSessionRef, latchToWait ); + return null; } ); } From cc6e52058501a82d7a00c716e6fdf65b9c3e6682 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 19 Oct 2017 10:22:56 +0200 Subject: [PATCH 7/7] Couple small cleanups --- .../internal/handlers/PullAllResponseHandler.java | 2 +- .../org/neo4j/driver/internal/util/Futures.java | 14 -------------- .../driver/v1/integration/SessionAsyncIT.java | 1 - .../org/neo4j/driver/v1/integration/SessionIT.java | 3 +-- .../driver/v1/integration/TransactionAsyncIT.java | 2 -- 5 files changed, 2 insertions(+), 20 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index 68fd1bd386..e957fcd3a6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -236,7 +236,7 @@ private Throwable extractFailure() { if ( failure == null ) { - throw new IllegalStateException( "Can't consume failure because it does not exist" ); + throw new IllegalStateException( "Can't extract failure because it does not exist" ); } Throwable error = failure; diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java index 25e7c819d2..d7fc6cd012 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java @@ -26,8 +26,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import static java.util.concurrent.CompletableFuture.completedFuture; - public final class Futures { private Futures() @@ -118,16 +116,4 @@ public static Throwable completionErrorCause( Throwable error ) } return error; } - - public static CompletionStage firstNotNull( CompletionStage stage1, CompletionStage stage2 ) - { - return stage1.thenCompose( value -> - { - if ( value != null ) - { - return completedFuture( value ); - } - return stage2; - } ); - } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index ca8e90d4b9..a30eab495a 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -726,7 +726,6 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() } @Test -<<<<<<() { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index d7c65e8ece..dc908e7b8c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -969,8 +969,6 @@ public void shouldFailToCommitWhenQueriesFailAndErrorNotConsumed() throws Interr tx.runAsync( "RETURN 10 / 0" ); tx.runAsync( "CREATE (:TestNode)" ); - Thread.sleep( 1000 ); - try { getBlocking( tx.commitAsync() );