3030import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsResponse ;
3131
3232import java .io .IOException ;
33+ import java .util .AbstractMap ;
3334import java .util .ArrayList ;
3435import java .util .Arrays ;
3536import java .util .Comparator ;
37+ import java .util .LinkedHashMap ;
3638import java .util .List ;
3739import java .util .Map ;
40+ import java .util .NavigableMap ;
3841import java .util .Objects ;
3942import java .util .PriorityQueue ;
4043import java .util .Queue ;
44+ import java .util .TreeMap ;
4145import java .util .concurrent .TimeUnit ;
4246import java .util .concurrent .atomic .AtomicInteger ;
4347import java .util .function .BiConsumer ;
4448import java .util .function .Consumer ;
4549import java .util .function .LongConsumer ;
4650import java .util .function .LongSupplier ;
51+ import java .util .stream .Collectors ;
4752
4853/**
4954 * The node task that fetch the write operations from a leader shard and
@@ -86,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
8691 private long numberOfFailedBulkOperations = 0 ;
8792 private long numberOfOperationsIndexed = 0 ;
8893 private final Queue <Translog .Operation > buffer = new PriorityQueue <>(Comparator .comparing (Translog .Operation ::seqNo ));
94+ private final LinkedHashMap <Long , ElasticsearchException > fetchExceptions ;
8995
9096 ShardFollowNodeTask (long id , String type , String action , String description , TaskId parentTask , Map <String , String > headers ,
9197 ShardFollowTask params , BiConsumer <TimeValue , Runnable > scheduler , final LongSupplier relativeTimeProvider ) {
@@ -95,6 +101,17 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
95101 this .relativeTimeProvider = relativeTimeProvider ;
96102 this .retryTimeout = params .getRetryTimeout ();
97103 this .idleShardChangesRequestDelay = params .getIdleShardRetryDelay ();
104+ /*
105+ * We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
106+ * concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry
107+ * when the fetch task associated with that from sequence number succeeds.
108+ */
109+ this .fetchExceptions = new LinkedHashMap <Long , ElasticsearchException >() {
110+ @ Override
111+ protected boolean removeEldestEntry (final Map .Entry <Long , ElasticsearchException > eldest ) {
112+ return size () > params .getMaxConcurrentReadBatches ();
113+ }
114+ };
98115 }
99116
100117 void start (
@@ -224,6 +241,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
224241 synchronized (ShardFollowNodeTask .this ) {
225242 totalFetchTimeMillis += TimeUnit .NANOSECONDS .toMillis (relativeTimeProvider .getAsLong () - startTime );
226243 numberOfSuccessfulFetches ++;
244+ fetchExceptions .remove (from );
227245 operationsReceived += response .getOperations ().length ;
228246 totalTransferredBytes += Arrays .stream (response .getOperations ()).mapToLong (Translog .Operation ::estimateSize ).sum ();
229247 }
@@ -233,6 +251,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
233251 synchronized (ShardFollowNodeTask .this ) {
234252 totalFetchTimeMillis += TimeUnit .NANOSECONDS .toMillis (relativeTimeProvider .getAsLong () - startTime );
235253 numberOfFailedFetches ++;
254+ fetchExceptions .put (from , new ElasticsearchException (e ));
236255 }
237256 handleFailure (e , retryCounter , () -> sendShardChangesRequest (from , maxOperationCount , maxRequiredSeqNo , retryCounter ));
238257 });
@@ -412,12 +431,13 @@ public synchronized Status getStatus() {
412431 totalIndexTimeMillis ,
413432 numberOfSuccessfulBulkOperations ,
414433 numberOfFailedBulkOperations ,
415- numberOfOperationsIndexed );
434+ numberOfOperationsIndexed ,
435+ new TreeMap <>(fetchExceptions ));
416436 }
417437
418438 public static class Status implements Task .Status {
419439
420- public static final String NAME = "shard-follow-node-task-status" ;
440+ public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status" ;
421441
422442 static final ParseField SHARD_ID = new ParseField ("shard_id" );
423443 static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField ("leader_global_checkpoint" );
@@ -438,8 +458,10 @@ public static class Status implements Task.Status {
438458 static final ParseField NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD = new ParseField ("number_of_successful_bulk_operations" );
439459 static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField ("number_of_failed_bulk_operations" );
440460 static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField ("number_of_operations_indexed" );
461+ static final ParseField FETCH_EXCEPTIONS = new ParseField ("fetch_exceptions" );
441462
442- static final ConstructingObjectParser <Status , Void > PARSER = new ConstructingObjectParser <>(NAME ,
463+ @ SuppressWarnings ("unchecked" )
464+ static final ConstructingObjectParser <Status , Void > STATUS_PARSER = new ConstructingObjectParser <>(STATUS_PARSER_NAME ,
443465 args -> new Status (
444466 (int ) args [0 ],
445467 (long ) args [1 ],
@@ -459,28 +481,51 @@ public static class Status implements Task.Status {
459481 (long ) args [15 ],
460482 (long ) args [16 ],
461483 (long ) args [17 ],
462- (long ) args [18 ]));
484+ (long ) args [18 ],
485+ new TreeMap <>(
486+ ((List <Map .Entry <Long , ElasticsearchException >>) args [19 ])
487+ .stream ()
488+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue )))));
489+
490+ public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry" ;
491+
492+ static final ConstructingObjectParser <Map .Entry <Long , ElasticsearchException >, Void > FETCH_EXCEPTIONS_ENTRY_PARSER =
493+ new ConstructingObjectParser <>(
494+ FETCH_EXCEPTIONS_ENTRY_PARSER_NAME ,
495+ args -> new AbstractMap .SimpleEntry <>((long ) args [0 ], (ElasticsearchException ) args [1 ]));
496+
497+ static {
498+ STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), SHARD_ID );
499+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_GLOBAL_CHECKPOINT_FIELD );
500+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_MAX_SEQ_NO_FIELD );
501+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_GLOBAL_CHECKPOINT_FIELD );
502+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_MAX_SEQ_NO_FIELD );
503+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), LAST_REQUESTED_SEQ_NO_FIELD );
504+ STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_READS_FIELD );
505+ STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_WRITES_FIELD );
506+ STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_QUEUED_WRITES_FIELD );
507+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), INDEX_METADATA_VERSION_FIELD );
508+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_FETCH_TIME_MILLIS_FIELD );
509+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD );
510+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_FAILED_FETCHES_FIELD );
511+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), OPERATIONS_RECEIVED_FIELD );
512+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_TRANSFERRED_BYTES );
513+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_INDEX_TIME_MILLIS_FIELD );
514+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD );
515+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD );
516+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_OPERATIONS_INDEXED_FIELD );
517+ STATUS_PARSER .declareObjectArray (ConstructingObjectParser .constructorArg (), FETCH_EXCEPTIONS_ENTRY_PARSER , FETCH_EXCEPTIONS );
518+ }
519+
520+ static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField ("from_seq_no" );
521+ static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField ("exception" );
463522
464523 static {
465- PARSER .declareInt (ConstructingObjectParser .constructorArg (), SHARD_ID );
466- PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_GLOBAL_CHECKPOINT_FIELD );
467- PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_MAX_SEQ_NO_FIELD );
468- PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_GLOBAL_CHECKPOINT_FIELD );
469- PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_MAX_SEQ_NO_FIELD );
470- PARSER .declareLong (ConstructingObjectParser .constructorArg (), LAST_REQUESTED_SEQ_NO_FIELD );
471- PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_READS_FIELD );
472- PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_WRITES_FIELD );
473- PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_QUEUED_WRITES_FIELD );
474- PARSER .declareLong (ConstructingObjectParser .constructorArg (), INDEX_METADATA_VERSION_FIELD );
475- PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_FETCH_TIME_MILLIS_FIELD );
476- PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD );
477- PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_FAILED_FETCHES_FIELD );
478- PARSER .declareLong (ConstructingObjectParser .constructorArg (), OPERATIONS_RECEIVED_FIELD );
479- PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_TRANSFERRED_BYTES );
480- PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_INDEX_TIME_MILLIS_FIELD );
481- PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD );
482- PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD );
483- PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_OPERATIONS_INDEXED_FIELD );
524+ FETCH_EXCEPTIONS_ENTRY_PARSER .declareLong (ConstructingObjectParser .constructorArg (), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO );
525+ FETCH_EXCEPTIONS_ENTRY_PARSER .declareObject (
526+ ConstructingObjectParser .constructorArg (),
527+ (p , c ) -> ElasticsearchException .fromXContent (p ),
528+ FETCH_EXCEPTIONS_ENTRY_EXCEPTION );
484529 }
485530
486531 private final int shardId ;
@@ -597,6 +642,12 @@ public long numberOfOperationsIndexed() {
597642 return numberOfOperationsIndexed ;
598643 }
599644
645+ private final NavigableMap <Long , ElasticsearchException > fetchExceptions ;
646+
647+ public NavigableMap <Long , ElasticsearchException > fetchExceptions () {
648+ return fetchExceptions ;
649+ }
650+
600651 Status (
601652 final int shardId ,
602653 final long leaderGlobalCheckpoint ,
@@ -616,7 +667,8 @@ public long numberOfOperationsIndexed() {
616667 final long totalIndexTimeMillis ,
617668 final long numberOfSuccessfulBulkOperations ,
618669 final long numberOfFailedBulkOperations ,
619- final long numberOfOperationsIndexed ) {
670+ final long numberOfOperationsIndexed ,
671+ final NavigableMap <Long , ElasticsearchException > fetchExceptions ) {
620672 this .shardId = shardId ;
621673 this .leaderGlobalCheckpoint = leaderGlobalCheckpoint ;
622674 this .leaderMaxSeqNo = leaderMaxSeqNo ;
@@ -636,6 +688,7 @@ public long numberOfOperationsIndexed() {
636688 this .numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations ;
637689 this .numberOfFailedBulkOperations = numberOfFailedBulkOperations ;
638690 this .numberOfOperationsIndexed = numberOfOperationsIndexed ;
691+ this .fetchExceptions = fetchExceptions ;
639692 }
640693
641694 public Status (final StreamInput in ) throws IOException {
@@ -658,11 +711,12 @@ public Status(final StreamInput in) throws IOException {
658711 this .numberOfSuccessfulBulkOperations = in .readVLong ();
659712 this .numberOfFailedBulkOperations = in .readVLong ();
660713 this .numberOfOperationsIndexed = in .readVLong ();
714+ this .fetchExceptions = new TreeMap <>(in .readMap (StreamInput ::readVLong , StreamInput ::readException ));
661715 }
662716
663717 @ Override
664718 public String getWriteableName () {
665- return NAME ;
719+ return STATUS_PARSER_NAME ;
666720 }
667721
668722 @ Override
@@ -686,6 +740,7 @@ public void writeTo(final StreamOutput out) throws IOException {
686740 out .writeVLong (numberOfSuccessfulBulkOperations );
687741 out .writeVLong (numberOfFailedBulkOperations );
688742 out .writeVLong (numberOfOperationsIndexed );
743+ out .writeMap (fetchExceptions , StreamOutput ::writeVLong , StreamOutput ::writeException );
689744 }
690745
691746 @ Override
@@ -720,13 +775,30 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
720775 builder .field (NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD .getPreferredName (), numberOfSuccessfulBulkOperations );
721776 builder .field (NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD .getPreferredName (), numberOfFailedBulkOperations );
722777 builder .field (NUMBER_OF_OPERATIONS_INDEXED_FIELD .getPreferredName (), numberOfOperationsIndexed );
778+ builder .startArray (FETCH_EXCEPTIONS .getPreferredName ());
779+ {
780+ for (final Map .Entry <Long , ElasticsearchException > entry : fetchExceptions .entrySet ()) {
781+ builder .startObject ();
782+ {
783+ builder .field (FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO .getPreferredName (), entry .getKey ());
784+ builder .field (FETCH_EXCEPTIONS_ENTRY_EXCEPTION .getPreferredName ());
785+ builder .startObject ();
786+ {
787+ ElasticsearchException .generateThrowableXContent (builder , params , entry .getValue ());
788+ }
789+ builder .endObject ();
790+ }
791+ builder .endObject ();
792+ }
793+ }
794+ builder .endArray ();
723795 }
724796 builder .endObject ();
725797 return builder ;
726798 }
727799
728800 public static Status fromXContent (final XContentParser parser ) {
729- return PARSER .apply (parser , null );
801+ return STATUS_PARSER .apply (parser , null );
730802 }
731803
732804 @ Override
0 commit comments