1919
2020import com .datastax .dse .driver .api .core .DseProtocolVersion ;
2121import com .datastax .dse .driver .api .core .cql .continuous .ContinuousAsyncResultSet ;
22+ import com .datastax .dse .driver .api .core .graph .AsyncGraphResultSet ;
2223import com .datastax .dse .driver .internal .core .DseProtocolFeature ;
2324import com .datastax .dse .driver .internal .core .cql .DseConversions ;
2425import com .datastax .dse .protocol .internal .request .Revise ;
2526import com .datastax .dse .protocol .internal .response .result .DseRowsMetadata ;
2627import com .datastax .oss .driver .api .core .AllNodesFailedException ;
28+ import com .datastax .oss .driver .api .core .AsyncPagingIterable ;
2729import com .datastax .oss .driver .api .core .CqlIdentifier ;
2830import com .datastax .oss .driver .api .core .DriverTimeoutException ;
2931import com .datastax .oss .driver .api .core .NodeUnavailableException ;
@@ -626,7 +628,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
626628 Throwable error = future .cause ();
627629 if (error instanceof EncoderException
628630 && error .getCause () instanceof FrameTooLongException ) {
629- trackNodeError (node , error .getCause ());
631+ trackNodeError (node , error .getCause (), null );
630632 lock .lock ();
631633 try {
632634 abort (error .getCause (), false );
@@ -643,7 +645,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
643645 .getMetricUpdater ()
644646 .incrementCounter (DefaultNodeMetric .UNSENT_REQUESTS , executionProfile .getName ());
645647 recordError (node , error );
646- trackNodeError (node , error .getCause ());
648+ trackNodeError (node , error .getCause (), null );
647649 sendRequest (statement , null , executionIndex , retryCount , scheduleSpeculativeExecution );
648650 }
649651 } else {
@@ -738,7 +740,8 @@ private void onPageTimeout(int expectedPage) {
738740 * Invoked when a continuous paging response is received, either a successful or failed one.
739741 *
740742 * <p>Delegates further processing to appropriate methods: {@link #processResultResponse(Result,
741- * Frame)} if the response was successful, or {@link #processErrorResponse(Error)} if it wasn't.
743+ * Frame)} if the response was successful, or {@link #processErrorResponse(Error, Frame)} if it
744+ * wasn't.
742745 *
743746 * @param response the received {@link Frame}.
744747 */
@@ -759,15 +762,15 @@ public void onResponse(@NonNull Frame response) {
759762 processResultResponse ((Result ) responseMessage , response );
760763 } else if (responseMessage instanceof Error ) {
761764 LOG .trace ("[{}] Got error response" , logPrefix );
762- processErrorResponse ((Error ) responseMessage );
765+ processErrorResponse ((Error ) responseMessage , response );
763766 } else {
764767 IllegalStateException error =
765768 new IllegalStateException ("Unexpected response " + responseMessage );
766- trackNodeError (node , error );
769+ trackNodeError (node , error , response );
767770 abort (error , false );
768771 }
769772 } catch (Throwable t ) {
770- trackNodeError (node , t );
773+ trackNodeError (node , t , response );
771774 abort (t , false );
772775 }
773776 } finally {
@@ -901,7 +904,7 @@ private void processResultResponse(@NonNull Result result, @Nullable Frame frame
901904 * @param errorMessage the error message received.
902905 */
903906 @ SuppressWarnings ("GuardedBy" ) // this method is only called with the lock held
904- private void processErrorResponse (@ NonNull Error errorMessage ) {
907+ private void processErrorResponse (@ NonNull Error errorMessage , @ NonNull Frame frame ) {
905908 assert lock .isHeldByCurrentThread ();
906909 if (errorMessage instanceof Unprepared ) {
907910 processUnprepared ((Unprepared ) errorMessage );
@@ -910,7 +913,7 @@ private void processErrorResponse(@NonNull Error errorMessage) {
910913 if (error instanceof BootstrappingException ) {
911914 LOG .trace ("[{}] {} is bootstrapping, trying next node" , logPrefix , node );
912915 recordError (node , error );
913- trackNodeError (node , error );
916+ trackNodeError (node , error , frame );
914917 sendRequest (statement , null , executionIndex , retryCount , false );
915918 } else if (error instanceof QueryValidationException
916919 || error instanceof FunctionFailureException
@@ -922,7 +925,7 @@ private void processErrorResponse(@NonNull Error errorMessage) {
922925 NodeMetricUpdater metricUpdater = ((DefaultNode ) node ).getMetricUpdater ();
923926 metricUpdater .incrementCounter (
924927 DefaultNodeMetric .OTHER_ERRORS , executionProfile .getName ());
925- trackNodeError (node , error );
928+ trackNodeError (node , error , frame );
926929 abort (error , true );
927930 } else {
928931 try {
@@ -1061,7 +1064,7 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10611064 + "This usually happens when you run a 'USE...' query after "
10621065 + "the statement was prepared." ,
10631066 Bytes .toHexString (idToReprepare ), Bytes .toHexString (repreparedId )));
1064- trackNodeError (node , illegalStateException );
1067+ trackNodeError (node , illegalStateException , null );
10651068 fatalError = illegalStateException ;
10661069 } else {
10671070 LOG .trace (
@@ -1080,18 +1083,18 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10801083 || prepareError instanceof FunctionFailureException
10811084 || prepareError instanceof ProtocolError ) {
10821085 LOG .trace ("[{}] Unrecoverable error on re-prepare, rethrowing" , logPrefix );
1083- trackNodeError (node , prepareError );
1086+ trackNodeError (node , prepareError , null );
10841087 fatalError = prepareError ;
10851088 }
10861089 }
10871090 } else if (exception instanceof RequestThrottlingException ) {
1088- trackNodeError (node , exception );
1091+ trackNodeError (node , exception , null );
10891092 fatalError = exception ;
10901093 }
10911094 if (fatalError == null ) {
10921095 LOG .trace ("[{}] Re-prepare failed, trying next node" , logPrefix );
10931096 recordError (node , exception );
1094- trackNodeError (node , exception );
1097+ trackNodeError (node , exception , null );
10951098 sendRequest (statement , null , executionIndex , retryCount , false );
10961099 }
10971100 }
@@ -1119,18 +1122,18 @@ private void processRetryVerdict(@NonNull RetryVerdict verdict, @NonNull Throwab
11191122 switch (verdict .getRetryDecision ()) {
11201123 case RETRY_SAME :
11211124 recordError (node , error );
1122- trackNodeError (node , error );
1125+ trackNodeError (node , error , null );
11231126 sendRequest (
11241127 verdict .getRetryRequest (statement ), node , executionIndex , retryCount + 1 , false );
11251128 break ;
11261129 case RETRY_NEXT :
11271130 recordError (node , error );
1128- trackNodeError (node , error );
1131+ trackNodeError (node , error , null );
11291132 sendRequest (
11301133 verdict .getRetryRequest (statement ), null , executionIndex , retryCount + 1 , false );
11311134 break ;
11321135 case RETHROW :
1133- trackNodeError (node , error );
1136+ trackNodeError (node , error , null );
11341137 abort (error , true );
11351138 break ;
11361139 case IGNORE :
@@ -1443,12 +1446,20 @@ private void reenableAutoReadIfNeeded() {
14431446
14441447 // ERROR HANDLING
14451448
1446- private void trackNodeError (@ NonNull Node node , @ NonNull Throwable error ) {
1449+ private void trackNodeError (
1450+ @ NonNull Node node , @ NonNull Throwable error , @ Nullable Frame frame ) {
14471451 if (nodeErrorReported .compareAndSet (false , true )) {
14481452 long latencyNanos = System .nanoTime () - this .messageStartTimeNanos ;
14491453 context
14501454 .getRequestTracker ()
1451- .onNodeError (this .statement , error , latencyNanos , executionProfile , node , logPrefix );
1455+ .onNodeError (
1456+ this .statement ,
1457+ error ,
1458+ latencyNanos ,
1459+ executionProfile ,
1460+ node ,
1461+ createExecutionInfo (frame ),
1462+ logPrefix );
14521463 }
14531464 }
14541465
@@ -1562,21 +1573,32 @@ private void completeResultSetFuture(
15621573 if (resultSetClass .isInstance (pageOrError )) {
15631574 if (future .complete (resultSetClass .cast (pageOrError ))) {
15641575 throttler .signalSuccess (ContinuousRequestHandlerBase .this );
1576+
1577+ ExecutionInfo executionInfo = null ;
1578+ if (pageOrError instanceof AsyncPagingIterable ) {
1579+ executionInfo = ((AsyncPagingIterable ) pageOrError ).getExecutionInfo ();
1580+ } else if (pageOrError instanceof AsyncGraphResultSet ) {
1581+ executionInfo = ((AsyncGraphResultSet ) pageOrError ).getRequestExecutionInfo ();
1582+ }
1583+
15651584 if (nodeSuccessReported .compareAndSet (false , true )) {
15661585 context
15671586 .getRequestTracker ()
1568- .onNodeSuccess (statement , nodeLatencyNanos , executionProfile , node , logPrefix );
1587+ .onNodeSuccess (
1588+ statement , nodeLatencyNanos , executionProfile , node , executionInfo , logPrefix );
15691589 }
15701590 context
15711591 .getRequestTracker ()
1572- .onSuccess (statement , totalLatencyNanos , executionProfile , node , logPrefix );
1592+ .onSuccess (
1593+ statement , totalLatencyNanos , executionProfile , node , executionInfo , logPrefix );
15731594 }
15741595 } else {
15751596 Throwable error = (Throwable ) pageOrError ;
15761597 if (future .completeExceptionally (error )) {
15771598 context
15781599 .getRequestTracker ()
1579- .onError (statement , error , totalLatencyNanos , executionProfile , node , logPrefix );
1600+ .onError (
1601+ statement , error , totalLatencyNanos , executionProfile , node , null , logPrefix );
15801602 if (error instanceof DriverTimeoutException ) {
15811603 throttler .signalTimeout (ContinuousRequestHandlerBase .this );
15821604 session
@@ -1607,6 +1629,22 @@ private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Fram
16071629 executionProfile );
16081630 }
16091631
1632+ @ NonNull
1633+ private ExecutionInfo createExecutionInfo (@ Nullable Frame response ) {
1634+ return new DefaultExecutionInfo (
1635+ statement ,
1636+ node ,
1637+ startedSpeculativeExecutionsCount .get (),
1638+ executionIndex ,
1639+ errors ,
1640+ null ,
1641+ response ,
1642+ true ,
1643+ session ,
1644+ context ,
1645+ executionProfile );
1646+ }
1647+
16101648 private void logTimeoutSchedulingError (IllegalStateException timeoutError ) {
16111649 // If we're racing with session shutdown, the timer might be stopped already. We don't want
16121650 // to schedule more executions anyway, so swallow the error.
0 commit comments