2020
2121import java .util .Collections ;
2222import java .util .Map ;
23+ import java .util .concurrent .CompletableFuture ;
2324import java .util .concurrent .CompletionStage ;
25+ import java .util .function .BiConsumer ;
2426
2527import org .neo4j .driver .ResultResourcesHandler ;
2628import org .neo4j .driver .internal .async .AsyncConnection ;
27- import org .neo4j .driver .internal .async .InternalFuture ;
28- import org .neo4j .driver .internal .async .InternalPromise ;
2929import org .neo4j .driver .internal .async .QueryRunner ;
3030import org .neo4j .driver .internal .handlers .BeginTxResponseHandler ;
3131import org .neo4j .driver .internal .handlers .BookmarkResponseHandler ;
3434import org .neo4j .driver .internal .handlers .RollbackTxResponseHandler ;
3535import org .neo4j .driver .internal .spi .Connection ;
3636import org .neo4j .driver .internal .types .InternalTypeSystem ;
37- import org .neo4j .driver .internal .util .BiConsumer ;
3837import org .neo4j .driver .v1 .Record ;
3938import org .neo4j .driver .v1 .Statement ;
4039import org .neo4j .driver .v1 .StatementResult ;
4544import org .neo4j .driver .v1 .exceptions .ClientException ;
4645import org .neo4j .driver .v1 .exceptions .Neo4jException ;
4746import org .neo4j .driver .v1 .types .TypeSystem ;
48- import org .neo4j .driver .v1 .util .Function ;
4947
50- import static org .neo4j .driver .internal .async .Futures .asCompletionStage ;
48+ import static java .util .concurrent .CompletableFuture .completedFuture ;
49+ import static org .neo4j .driver .internal .async .Futures .failedFuture ;
5150import static org .neo4j .driver .internal .util .ErrorUtil .isRecoverable ;
5251import static org .neo4j .driver .v1 .Values .ofValue ;
5352import static org .neo4j .driver .v1 .Values .value ;
@@ -119,25 +118,23 @@ public void begin( Bookmark initialBookmark )
119118 }
120119 }
121120
122- public InternalFuture <ExplicitTransaction > beginAsync ( Bookmark initialBookmark )
121+ public CompletionStage <ExplicitTransaction > beginAsync ( Bookmark initialBookmark )
123122 {
124- InternalPromise <ExplicitTransaction > beginTxPromise = asyncConnection .newPromise ();
125-
126123 Map <String ,Value > parameters = initialBookmark .asBeginTransactionParameters ();
127124 asyncConnection .run ( BEGIN_QUERY , parameters , NoOpResponseHandler .INSTANCE );
128125
129126 if ( initialBookmark .isEmpty () )
130127 {
131128 asyncConnection .pullAll ( NoOpResponseHandler .INSTANCE );
132- beginTxPromise . setSuccess ( this );
129+ return completedFuture ( this );
133130 }
134131 else
135132 {
136- asyncConnection .pullAll ( new BeginTxResponseHandler <>( beginTxPromise , this ) );
133+ CompletableFuture <ExplicitTransaction > beginFuture = new CompletableFuture <>();
134+ asyncConnection .pullAll ( new BeginTxResponseHandler <>( beginFuture , this ) );
137135 asyncConnection .flush ();
136+ return beginFuture ;
138137 }
139-
140- return beginTxPromise ;
141138 }
142139
143140 @ Override
@@ -217,20 +214,14 @@ private void rollbackTx()
217214
218215 @ Override
219216 public CompletionStage <Void > commitAsync ()
220- {
221- return asCompletionStage ( internalCommitAsync () );
222- }
223-
224- InternalFuture <Void > internalCommitAsync ()
225217 {
226218 if ( state == State .COMMITTED )
227219 {
228- return asyncConnection .< Void > newPromise (). setSuccess ( null );
220+ return completedFuture ( null );
229221 }
230222 else if ( state == State .ROLLED_BACK )
231223 {
232- return asyncConnection .<Void >newPromise ().setFailure (
233- new ClientException ( "Can't commit, transaction has already been rolled back" ) );
224+ return failedFuture ( new ClientException ( "Can't commit, transaction has already been rolled back" ) );
234225 }
235226 else
236227 {
@@ -240,20 +231,14 @@ else if ( state == State.ROLLED_BACK )
240231
241232 @ Override
242233 public CompletionStage <Void > rollbackAsync ()
243- {
244- return asCompletionStage ( internalRollbackAsync () );
245- }
246-
247- InternalFuture <Void > internalRollbackAsync ()
248234 {
249235 if ( state == State .COMMITTED )
250236 {
251- return asyncConnection .<Void >newPromise ()
252- .setFailure ( new ClientException ( "Can't rollback, transaction has already been committed" ) );
237+ return failedFuture ( new ClientException ( "Can't rollback, transaction has already been committed" ) );
253238 }
254239 else if ( state == State .ROLLED_BACK )
255240 {
256- return asyncConnection .< Void > newPromise (). setSuccess ( null );
241+ return completedFuture ( null );
257242 }
258243 else
259244 {
@@ -263,51 +248,39 @@ else if ( state == State.ROLLED_BACK )
263248
264249 private BiConsumer <Void ,Throwable > releaseConnectionAndNotifySession ()
265250 {
266- return new BiConsumer < Void , Throwable >()
251+ return ( ignore , error ) ->
267252 {
268- @ Override
269- public void accept ( Void result , Throwable error )
270- {
271- asyncConnection .release ();
272- session .asyncTransactionClosed ( ExplicitTransaction .this );
273- }
253+ asyncConnection .release ();
254+ session .asyncTransactionClosed ( ExplicitTransaction .this );
274255 };
275256 }
276257
277- private InternalFuture <Void > doCommitAsync ()
258+ private CompletionStage <Void > doCommitAsync ()
278259 {
279- InternalPromise <Void > commitTxPromise = asyncConnection . newPromise ();
260+ CompletableFuture <Void > commitFuture = new CompletableFuture <> ();
280261
281- asyncConnection .run ( COMMIT_QUERY , Collections .< String , Value > emptyMap (), NoOpResponseHandler .INSTANCE );
282- asyncConnection .pullAll ( new CommitTxResponseHandler ( commitTxPromise , this ) );
262+ asyncConnection .run ( COMMIT_QUERY , Collections .emptyMap (), NoOpResponseHandler .INSTANCE );
263+ asyncConnection .pullAll ( new CommitTxResponseHandler ( commitFuture , this ) );
283264 asyncConnection .flush ();
284265
285- return commitTxPromise .thenApply ( new Function < Void , Void >()
266+ return commitFuture .thenApply ( ignore ->
286267 {
287- @ Override
288- public Void apply ( Void ignore )
289- {
290- ExplicitTransaction .this .state = State .COMMITTED ;
291- return null ;
292- }
268+ ExplicitTransaction .this .state = State .COMMITTED ;
269+ return null ;
293270 } );
294271 }
295272
296- private InternalFuture <Void > doRollbackAsync ()
273+ private CompletionStage <Void > doRollbackAsync ()
297274 {
298- InternalPromise <Void > rollbackTxPromise = asyncConnection . newPromise ();
299- asyncConnection .run ( ROLLBACK_QUERY , Collections .< String , Value > emptyMap (), NoOpResponseHandler .INSTANCE );
300- asyncConnection .pullAll ( new RollbackTxResponseHandler ( rollbackTxPromise ) );
275+ CompletableFuture <Void > rollbackFuture = new CompletableFuture <> ();
276+ asyncConnection .run ( ROLLBACK_QUERY , Collections .emptyMap (), NoOpResponseHandler .INSTANCE );
277+ asyncConnection .pullAll ( new RollbackTxResponseHandler ( rollbackFuture ) );
301278 asyncConnection .flush ();
302279
303- return rollbackTxPromise .thenApply ( new Function < Void , Void >()
280+ return rollbackFuture .thenApply ( ignore ->
304281 {
305- @ Override
306- public Void apply ( Void ignore )
307- {
308- ExplicitTransaction .this .state = State .ROLLED_BACK ;
309- return null ;
310- }
282+ ExplicitTransaction .this .state = State .ROLLED_BACK ;
283+ return null ;
311284 } );
312285 }
313286
@@ -393,7 +366,7 @@ public StatementResult run( Statement statement )
393366 public CompletionStage <StatementResultCursor > runAsync ( Statement statement )
394367 {
395368 ensureNotFailed ();
396- return asCompletionStage ( QueryRunner .runAsync ( asyncConnection , statement , this ) );
369+ return QueryRunner .runAsync ( asyncConnection , statement , this );
397370 }
398371
399372 @ Override
0 commit comments