2020
2121import java .util .Collections ;
2222import java .util .Map ;
23+ import java .util .concurrent .CompletableFuture ;
24+ import java .util .concurrent .CompletionStage ;
25+ import java .util .function .BiConsumer ;
2326
2427import org .neo4j .driver .ResultResourcesHandler ;
2528import org .neo4j .driver .internal .async .AsyncConnection ;
26- import org .neo4j .driver .internal .async .InternalFuture ;
27- import org .neo4j .driver .internal .async .InternalPromise ;
2829import org .neo4j .driver .internal .async .QueryRunner ;
2930import org .neo4j .driver .internal .handlers .BeginTxResponseHandler ;
3031import org .neo4j .driver .internal .handlers .BookmarkResponseHandler ;
3334import org .neo4j .driver .internal .handlers .RollbackTxResponseHandler ;
3435import org .neo4j .driver .internal .spi .Connection ;
3536import org .neo4j .driver .internal .types .InternalTypeSystem ;
36- import org .neo4j .driver .internal .util .BiConsumer ;
3737import org .neo4j .driver .v1 .Record ;
38- import org .neo4j .driver .v1 .Response ;
3938import org .neo4j .driver .v1 .Statement ;
4039import org .neo4j .driver .v1 .StatementResult ;
4140import org .neo4j .driver .v1 .StatementResultCursor ;
4544import org .neo4j .driver .v1 .exceptions .ClientException ;
4645import org .neo4j .driver .v1 .exceptions .Neo4jException ;
4746import org .neo4j .driver .v1 .types .TypeSystem ;
48- import org .neo4j .driver .v1 .util .Function ;
4947
48+ import static java .util .concurrent .CompletableFuture .completedFuture ;
49+ import static org .neo4j .driver .internal .async .Futures .failedFuture ;
5050import static org .neo4j .driver .internal .util .ErrorUtil .isRecoverable ;
5151import static org .neo4j .driver .v1 .Values .ofValue ;
5252import static org .neo4j .driver .v1 .Values .value ;
@@ -118,25 +118,23 @@ public void begin( Bookmark initialBookmark )
118118 }
119119 }
120120
121- public InternalFuture <ExplicitTransaction > beginAsync ( Bookmark initialBookmark )
121+ public CompletionStage <ExplicitTransaction > beginAsync ( Bookmark initialBookmark )
122122 {
123- InternalPromise <ExplicitTransaction > beginTxPromise = asyncConnection .newPromise ();
124-
125123 Map <String ,Value > parameters = initialBookmark .asBeginTransactionParameters ();
126124 asyncConnection .run ( BEGIN_QUERY , parameters , NoOpResponseHandler .INSTANCE );
127125
128126 if ( initialBookmark .isEmpty () )
129127 {
130128 asyncConnection .pullAll ( NoOpResponseHandler .INSTANCE );
131- beginTxPromise . setSuccess ( this );
129+ return completedFuture ( this );
132130 }
133131 else
134132 {
135- asyncConnection .pullAll ( new BeginTxResponseHandler <>( beginTxPromise , this ) );
133+ CompletableFuture <ExplicitTransaction > beginFuture = new CompletableFuture <>();
134+ asyncConnection .pullAll ( new BeginTxResponseHandler <>( beginFuture , this ) );
136135 asyncConnection .flush ();
136+ return beginFuture ;
137137 }
138-
139- return beginTxPromise ;
140138 }
141139
142140 @ Override
@@ -215,21 +213,15 @@ private void rollbackTx()
215213 }
216214
217215 @ Override
218- public Response <Void > commitAsync ()
219- {
220- return internalCommitAsync ();
221- }
222-
223- InternalFuture <Void > internalCommitAsync ()
216+ public CompletionStage <Void > commitAsync ()
224217 {
225218 if ( state == State .COMMITTED )
226219 {
227- return asyncConnection .< Void > newPromise (). setSuccess ( null );
220+ return completedFuture ( null );
228221 }
229222 else if ( state == State .ROLLED_BACK )
230223 {
231- return asyncConnection .<Void >newPromise ().setFailure (
232- new ClientException ( "Can't commit, transaction has already been rolled back" ) );
224+ return failedFuture ( new ClientException ( "Can't commit, transaction has already been rolled back" ) );
233225 }
234226 else
235227 {
@@ -238,21 +230,15 @@ else if ( state == State.ROLLED_BACK )
238230 }
239231
240232 @ Override
241- public Response <Void > rollbackAsync ()
242- {
243- return internalRollbackAsync ();
244- }
245-
246- InternalFuture <Void > internalRollbackAsync ()
233+ public CompletionStage <Void > rollbackAsync ()
247234 {
248235 if ( state == State .COMMITTED )
249236 {
250- return asyncConnection .<Void >newPromise ()
251- .setFailure ( new ClientException ( "Can't rollback, transaction has already been committed" ) );
237+ return failedFuture ( new ClientException ( "Can't rollback, transaction has already been committed" ) );
252238 }
253239 else if ( state == State .ROLLED_BACK )
254240 {
255- return asyncConnection .< Void > newPromise (). setSuccess ( null );
241+ return completedFuture ( null );
256242 }
257243 else
258244 {
@@ -262,51 +248,39 @@ else if ( state == State.ROLLED_BACK )
262248
263249 private BiConsumer <Void ,Throwable > releaseConnectionAndNotifySession ()
264250 {
265- return new BiConsumer < Void , Throwable >()
251+ return ( ignore , error ) ->
266252 {
267- @ Override
268- public void accept ( Void result , Throwable error )
269- {
270- asyncConnection .release ();
271- session .asyncTransactionClosed ( ExplicitTransaction .this );
272- }
253+ asyncConnection .release ();
254+ session .asyncTransactionClosed ( ExplicitTransaction .this );
273255 };
274256 }
275257
276- private InternalFuture <Void > doCommitAsync ()
258+ private CompletionStage <Void > doCommitAsync ()
277259 {
278- InternalPromise <Void > commitTxPromise = asyncConnection . newPromise ();
260+ CompletableFuture <Void > commitFuture = new CompletableFuture <> ();
279261
280- asyncConnection .run ( COMMIT_QUERY , Collections .< String , Value > emptyMap (), NoOpResponseHandler .INSTANCE );
281- asyncConnection .pullAll ( new CommitTxResponseHandler ( commitTxPromise , this ) );
262+ asyncConnection .run ( COMMIT_QUERY , Collections .emptyMap (), NoOpResponseHandler .INSTANCE );
263+ asyncConnection .pullAll ( new CommitTxResponseHandler ( commitFuture , this ) );
282264 asyncConnection .flush ();
283265
284- return commitTxPromise .thenApply ( new Function < Void , Void >()
266+ return commitFuture .thenApply ( ignore ->
285267 {
286- @ Override
287- public Void apply ( Void ignore )
288- {
289- ExplicitTransaction .this .state = State .COMMITTED ;
290- return null ;
291- }
268+ ExplicitTransaction .this .state = State .COMMITTED ;
269+ return null ;
292270 } );
293271 }
294272
295- private InternalFuture <Void > doRollbackAsync ()
273+ private CompletionStage <Void > doRollbackAsync ()
296274 {
297- InternalPromise <Void > rollbackTxPromise = asyncConnection . newPromise ();
298- asyncConnection .run ( ROLLBACK_QUERY , Collections .< String , Value > emptyMap (), NoOpResponseHandler .INSTANCE );
299- asyncConnection .pullAll ( new RollbackTxResponseHandler ( rollbackTxPromise ) );
275+ CompletableFuture <Void > rollbackFuture = new CompletableFuture <> ();
276+ asyncConnection .run ( ROLLBACK_QUERY , Collections .emptyMap (), NoOpResponseHandler .INSTANCE );
277+ asyncConnection .pullAll ( new RollbackTxResponseHandler ( rollbackFuture ) );
300278 asyncConnection .flush ();
301279
302- return rollbackTxPromise .thenApply ( new Function < Void , Void >()
280+ return rollbackFuture .thenApply ( ignore ->
303281 {
304- @ Override
305- public Void apply ( Void ignore )
306- {
307- ExplicitTransaction .this .state = State .ROLLED_BACK ;
308- return null ;
309- }
282+ ExplicitTransaction .this .state = State .ROLLED_BACK ;
283+ return null ;
310284 } );
311285 }
312286
@@ -317,7 +291,7 @@ public StatementResult run( String statementText, Value statementParameters )
317291 }
318292
319293 @ Override
320- public Response <StatementResultCursor > runAsync ( String statementText , Value parameters )
294+ public CompletionStage <StatementResultCursor > runAsync ( String statementText , Value parameters )
321295 {
322296 return runAsync ( new Statement ( statementText , parameters ) );
323297 }
@@ -329,7 +303,7 @@ public StatementResult run( String statementText )
329303 }
330304
331305 @ Override
332- public Response <StatementResultCursor > runAsync ( String statementTemplate )
306+ public CompletionStage <StatementResultCursor > runAsync ( String statementTemplate )
333307 {
334308 return runAsync ( statementTemplate , Values .EmptyMap );
335309 }
@@ -342,7 +316,8 @@ public StatementResult run( String statementText, Map<String,Object> statementPa
342316 }
343317
344318 @ Override
345- public Response <StatementResultCursor > runAsync ( String statementTemplate , Map <String ,Object > statementParameters )
319+ public CompletionStage <StatementResultCursor > runAsync ( String statementTemplate ,
320+ Map <String ,Object > statementParameters )
346321 {
347322 Value params = statementParameters == null ? Values .EmptyMap : value ( statementParameters );
348323 return runAsync ( statementTemplate , params );
@@ -356,7 +331,7 @@ public StatementResult run( String statementTemplate, Record statementParameters
356331 }
357332
358333 @ Override
359- public Response <StatementResultCursor > runAsync ( String statementTemplate , Record statementParameters )
334+ public CompletionStage <StatementResultCursor > runAsync ( String statementTemplate , Record statementParameters )
360335 {
361336 Value params = statementParameters == null ? Values .EmptyMap : value ( statementParameters .asMap () );
362337 return runAsync ( statementTemplate , params );
@@ -388,7 +363,7 @@ public StatementResult run( Statement statement )
388363 }
389364
390365 @ Override
391- public Response <StatementResultCursor > runAsync ( Statement statement )
366+ public CompletionStage <StatementResultCursor > runAsync ( Statement statement )
392367 {
393368 ensureNotFailed ();
394369 return QueryRunner .runAsync ( asyncConnection , statement , this );
0 commit comments