2424import java .util .concurrent .CompletableFuture ;
2525import java .util .concurrent .CompletionStage ;
2626import java .util .function .Consumer ;
27+ import java .util .function .Function ;
2728
2829import org .neo4j .driver .internal .handlers .PullAllResponseHandler ;
2930import org .neo4j .driver .internal .handlers .RunResponseHandler ;
3031import org .neo4j .driver .v1 .Record ;
3132import org .neo4j .driver .v1 .StatementResultCursor ;
33+ import org .neo4j .driver .v1 .exceptions .NoSuchRecordException ;
3234import org .neo4j .driver .v1 .summary .ResultSummary ;
3335
3436import static java .util .Objects .requireNonNull ;
@@ -85,18 +87,54 @@ public CompletionStage<Record> peekAsync()
8587 }
8688
8789 @ Override
88- public CompletionStage <Void > forEachAsync ( Consumer <Record > action )
90+ public CompletionStage <Record > singleAsync ()
91+ {
92+ return nextAsync ().thenCompose ( firstRecord ->
93+ {
94+ if ( firstRecord == null )
95+ {
96+ throw new NoSuchRecordException ( "Cannot retrieve a single record, because this cursor is empty." );
97+ }
98+ return nextAsync ().thenApply ( secondRecord ->
99+ {
100+ if ( secondRecord != null )
101+ {
102+ throw new NoSuchRecordException ( "Expected a cursor with a single record, but this cursor " +
103+ "contains at least one more. Ensure your query returns only " +
104+ "one record." );
105+ }
106+ return firstRecord ;
107+ } );
108+ } );
109+ }
110+
111+ @ Override
112+ public CompletionStage <ResultSummary > consumeAsync ()
113+ {
114+ return forEachAsync ( record ->
115+ {
116+ } );
117+ }
118+
119+ @ Override
120+ public CompletionStage <ResultSummary > forEachAsync ( Consumer <Record > action )
89121 {
90122 CompletableFuture <Void > resultFuture = new CompletableFuture <>();
91123 internalForEachAsync ( action , resultFuture );
92- return resultFuture ;
124+ return resultFuture . thenCompose ( ignore -> summaryAsync () ) ;
93125 }
94126
95127 @ Override
96128 public CompletionStage <List <Record >> listAsync ()
97129 {
98- CompletableFuture <List <Record >> resultFuture = new CompletableFuture <>();
99- internalListAsync ( new ArrayList <>(), resultFuture );
130+ return listAsync ( Function .identity () );
131+ }
132+
133+ @ Override
134+ public <T > CompletionStage <List <T >> listAsync ( Function <Record ,T > mapFunction )
135+ {
136+ CompletableFuture <List <T >> resultFuture = new CompletableFuture <>();
137+ internalListAsync ( new ArrayList <>(), resultFuture , mapFunction );
100138 return resultFuture ;
101139 }
102140
@@ -114,7 +152,15 @@ private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Vo
114152 }
115153 else if ( record != null )
116154 {
117- action .accept ( record );
155+ try
156+ {
157+ action .accept ( record );
158+ }
159+ catch ( Throwable actionError )
160+ {
161+ resultFuture .completeExceptionally ( actionError );
162+ return ;
163+ }
118164 internalForEachAsync ( action , resultFuture );
119165 }
120166 else
@@ -124,7 +170,8 @@ else if ( record != null )
124170 } );
125171 }
126172
127- private void internalListAsync ( List <Record > records , CompletableFuture <List <Record >> resultFuture )
173+ private <T > void internalListAsync ( List <T > result , CompletableFuture <List <T >> resultFuture ,
174+ Function <Record ,T > mapFunction )
128175 {
129176 CompletionStage <Record > recordFuture = nextAsync ();
130177
@@ -138,12 +185,22 @@ private void internalListAsync( List<Record> records, CompletableFuture<List<Rec
138185 }
139186 else if ( record != null )
140187 {
141- records .add ( record );
142- internalListAsync ( records , resultFuture );
188+ T value ;
189+ try
190+ {
191+ value = mapFunction .apply ( record );
192+ }
193+ catch ( Throwable mapError )
194+ {
195+ resultFuture .completeExceptionally ( mapError );
196+ return ;
197+ }
198+ result .add ( value );
199+ internalListAsync ( result , resultFuture , mapFunction );
143200 }
144201 else
145202 {
146- resultFuture .complete ( records );
203+ resultFuture .complete ( result );
147204 }
148205 } );
149206 }
0 commit comments