|
18 | 18 | */ |
19 | 19 | package org.neo4j.driver.internal.handlers; |
20 | 20 |
|
21 | | -import java.util.Collections; |
22 | | -import java.util.LinkedList; |
23 | | -import java.util.List; |
| 21 | +import java.util.ArrayDeque; |
24 | 22 | import java.util.Map; |
25 | 23 | import java.util.Queue; |
26 | 24 | import java.util.concurrent.CompletableFuture; |
|
29 | 27 | import org.neo4j.driver.internal.InternalRecord; |
30 | 28 | import org.neo4j.driver.internal.spi.Connection; |
31 | 29 | import org.neo4j.driver.internal.spi.ResponseHandler; |
32 | | -import org.neo4j.driver.internal.summary.InternalNotification; |
33 | | -import org.neo4j.driver.internal.summary.InternalPlan; |
34 | | -import org.neo4j.driver.internal.summary.InternalProfiledPlan; |
35 | | -import org.neo4j.driver.internal.summary.InternalResultSummary; |
36 | | -import org.neo4j.driver.internal.summary.InternalServerInfo; |
37 | | -import org.neo4j.driver.internal.summary.InternalSummaryCounters; |
| 30 | +import org.neo4j.driver.internal.util.MetadataUtil; |
38 | 31 | import org.neo4j.driver.v1.Record; |
39 | 32 | import org.neo4j.driver.v1.Statement; |
40 | 33 | import org.neo4j.driver.v1.Value; |
41 | | -import org.neo4j.driver.v1.summary.Notification; |
42 | | -import org.neo4j.driver.v1.summary.Plan; |
43 | | -import org.neo4j.driver.v1.summary.ProfiledPlan; |
44 | 34 | import org.neo4j.driver.v1.summary.ResultSummary; |
45 | | -import org.neo4j.driver.v1.summary.StatementType; |
46 | 35 |
|
47 | 36 | import static java.util.Collections.emptyMap; |
48 | 37 | import static java.util.Objects.requireNonNull; |
49 | 38 | import static java.util.concurrent.CompletableFuture.completedFuture; |
50 | 39 | import static org.neo4j.driver.internal.util.Futures.failedFuture; |
51 | 40 |
|
52 | | -// todo: unit tests |
53 | 41 | public abstract class PullAllResponseHandler implements ResponseHandler |
54 | 42 | { |
55 | | - private static final boolean TOUCH_AUTO_READ = false; |
| 43 | + static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 ); |
| 44 | + static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 ); |
56 | 45 |
|
57 | 46 | private final Statement statement; |
58 | 47 | private final RunResponseHandler runResponseHandler; |
59 | 48 | protected final Connection connection; |
60 | 49 |
|
61 | | - private final Queue<Record> records = new LinkedList<>(); |
| 50 | + private final Queue<Record> records = new ArrayDeque<>(); |
62 | 51 |
|
63 | 52 | private boolean finished; |
64 | 53 | private Throwable failure; |
@@ -210,25 +199,31 @@ else if ( finished ) |
210 | 199 | private void queueRecord( Record record ) |
211 | 200 | { |
212 | 201 | records.add( record ); |
213 | | - if ( TOUCH_AUTO_READ ) |
| 202 | + |
| 203 | + boolean shouldBufferAllRecords = summaryFuture != null || failureFuture != null; |
| 204 | + // when summary or failure is requested we have to buffer all remaining records and then return summary/failure |
| 205 | + // do not disable auto-read in this case, otherwise records will not be consumed and trailing |
| 206 | + // SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for summary/failure |
| 207 | + if ( !shouldBufferAllRecords && records.size() > RECORD_BUFFER_HIGH_WATERMARK ) |
214 | 208 | { |
215 | | - if ( records.size() > 10_000 ) |
216 | | - { |
217 | | - connection.disableAutoRead(); |
218 | | - } |
| 209 | + // more than high watermark records are already queued, tell connection to stop auto-reading from network |
| 210 | + // this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are |
| 211 | + // fetched from network faster than consumed |
| 212 | + connection.disableAutoRead(); |
219 | 213 | } |
220 | 214 | } |
221 | 215 |
|
222 | 216 | private Record dequeueRecord() |
223 | 217 | { |
224 | 218 | Record record = records.poll(); |
225 | | - if ( TOUCH_AUTO_READ ) |
| 219 | + |
| 220 | + if ( records.size() < RECORD_BUFFER_LOW_WATERMARK ) |
226 | 221 | { |
227 | | - if ( record != null && records.size() < 100 ) |
228 | | - { |
229 | | - connection.enableAutoRead(); |
230 | | - } |
| 222 | + // less than low watermark records are now available in the buffer, tell connection to pre-fetch more |
| 223 | + // and populate queue with new records from network |
| 224 | + connection.enableAutoRead(); |
231 | 225 | } |
| 226 | + |
232 | 227 | return record; |
233 | 228 | } |
234 | 229 |
|
@@ -302,89 +297,7 @@ private boolean completeFailureFuture( Throwable error ) |
302 | 297 |
|
303 | 298 | private ResultSummary extractResultSummary( Map<String,Value> metadata ) |
304 | 299 | { |
305 | | - InternalServerInfo serverInfo = new InternalServerInfo( connection.serverAddress(), |
306 | | - connection.serverVersion() ); |
307 | | - return new InternalResultSummary( statement, serverInfo, extractStatementType( metadata ), |
308 | | - extractCounters( metadata ), extractPlan( metadata ), extractProfiledPlan( metadata ), |
309 | | - extractNotifications( metadata ), runResponseHandler.resultAvailableAfter(), |
310 | | - extractResultConsumedAfter( metadata ) ); |
311 | | - } |
312 | | - |
313 | | - private static StatementType extractStatementType( Map<String,Value> metadata ) |
314 | | - { |
315 | | - Value typeValue = metadata.get( "type" ); |
316 | | - if ( typeValue != null ) |
317 | | - { |
318 | | - return StatementType.fromCode( typeValue.asString() ); |
319 | | - } |
320 | | - return null; |
321 | | - } |
322 | | - |
323 | | - private static InternalSummaryCounters extractCounters( Map<String,Value> metadata ) |
324 | | - { |
325 | | - Value countersValue = metadata.get( "stats" ); |
326 | | - if ( countersValue != null ) |
327 | | - { |
328 | | - return new InternalSummaryCounters( |
329 | | - counterValue( countersValue, "nodes-created" ), |
330 | | - counterValue( countersValue, "nodes-deleted" ), |
331 | | - counterValue( countersValue, "relationships-created" ), |
332 | | - counterValue( countersValue, "relationships-deleted" ), |
333 | | - counterValue( countersValue, "properties-set" ), |
334 | | - counterValue( countersValue, "labels-added" ), |
335 | | - counterValue( countersValue, "labels-removed" ), |
336 | | - counterValue( countersValue, "indexes-added" ), |
337 | | - counterValue( countersValue, "indexes-removed" ), |
338 | | - counterValue( countersValue, "constraints-added" ), |
339 | | - counterValue( countersValue, "constraints-removed" ) |
340 | | - ); |
341 | | - } |
342 | | - return null; |
343 | | - } |
344 | | - |
345 | | - private static int counterValue( Value countersValue, String name ) |
346 | | - { |
347 | | - Value value = countersValue.get( name ); |
348 | | - return value.isNull() ? 0 : value.asInt(); |
349 | | - } |
350 | | - |
351 | | - private static Plan extractPlan( Map<String,Value> metadata ) |
352 | | - { |
353 | | - Value planValue = metadata.get( "plan" ); |
354 | | - if ( planValue != null ) |
355 | | - { |
356 | | - return InternalPlan.EXPLAIN_PLAN_FROM_VALUE.apply( planValue ); |
357 | | - } |
358 | | - return null; |
359 | | - } |
360 | | - |
361 | | - private static ProfiledPlan extractProfiledPlan( Map<String,Value> metadata ) |
362 | | - { |
363 | | - Value profiledPlanValue = metadata.get( "profile" ); |
364 | | - if ( profiledPlanValue != null ) |
365 | | - { |
366 | | - return InternalProfiledPlan.PROFILED_PLAN_FROM_VALUE.apply( profiledPlanValue ); |
367 | | - } |
368 | | - return null; |
369 | | - } |
370 | | - |
371 | | - private static List<Notification> extractNotifications( Map<String,Value> metadata ) |
372 | | - { |
373 | | - Value notificationsValue = metadata.get( "notifications" ); |
374 | | - if ( notificationsValue != null ) |
375 | | - { |
376 | | - return notificationsValue.asList( InternalNotification.VALUE_TO_NOTIFICATION ); |
377 | | - } |
378 | | - return Collections.emptyList(); |
379 | | - } |
380 | | - |
381 | | - private static long extractResultConsumedAfter( Map<String,Value> metadata ) |
382 | | - { |
383 | | - Value resultConsumedAfterValue = metadata.get( "result_consumed_after" ); |
384 | | - if ( resultConsumedAfterValue != null ) |
385 | | - { |
386 | | - return resultConsumedAfterValue.asLong(); |
387 | | - } |
388 | | - return -1; |
| 300 | + long resultAvailableAfter = runResponseHandler.resultAvailableAfter(); |
| 301 | + return MetadataUtil.extractSummary( statement, connection, resultAvailableAfter, metadata ); |
389 | 302 | } |
390 | 303 | } |
0 commit comments