55 */
66package org .elasticsearch .xpack .ccr .action ;
77
8+ import org .apache .logging .log4j .message .ParameterizedMessage ;
89import org .elasticsearch .action .ActionListener ;
910import org .elasticsearch .action .admin .cluster .state .ClusterStateRequest ;
1011import org .elasticsearch .action .admin .indices .mapping .put .PutMappingRequest ;
2122import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
2223import org .elasticsearch .common .xcontent .XContentType ;
2324import org .elasticsearch .index .Index ;
25+ import org .elasticsearch .index .IndexNotFoundException ;
2426import org .elasticsearch .index .seqno .SeqNoStats ;
2527import org .elasticsearch .index .shard .ShardId ;
28+ import org .elasticsearch .index .shard .ShardNotFoundException ;
2629import org .elasticsearch .index .translog .Translog ;
2730import org .elasticsearch .persistent .AllocatedPersistentTask ;
2831import org .elasticsearch .persistent .PersistentTaskState ;
@@ -164,9 +167,24 @@ interface BiLongConsumer {
164167 protected void nodeOperation (final AllocatedPersistentTask task , final ShardFollowTask params , final PersistentTaskState state ) {
165168 Client followerClient = wrapClient (client , params .getHeaders ());
166169 ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask ) task ;
167- logger .info ("{} Started to track leader shard {}" , params .getFollowShardId (), params .getLeaderShardId ());
168- fetchGlobalCheckpoint (followerClient , params .getFollowShardId (),
169- (followerGCP , maxSeqNo ) -> shardFollowNodeTask .start (followerGCP , maxSeqNo , followerGCP , maxSeqNo ), task ::markAsFailed );
170+ logger .info ("{} Starting to track leader shard {}" , params .getFollowShardId (), params .getLeaderShardId ());
171+
172+ BiLongConsumer handler = (followerGCP , maxSeqNo ) -> shardFollowNodeTask .start (followerGCP , maxSeqNo , followerGCP , maxSeqNo );
173+ Consumer <Exception > errorHandler = e -> {
174+ if (shardFollowNodeTask .isStopped ()) {
175+ return ;
176+ }
177+
178+ if (ShardFollowNodeTask .shouldRetry (e )) {
179+ logger .debug (new ParameterizedMessage ("failed to fetch follow shard global {} checkpoint and max sequence number" ,
180+ shardFollowNodeTask ), e );
181+ threadPool .schedule (params .getMaxRetryDelay (), Ccr .CCR_THREAD_POOL_NAME , () -> nodeOperation (task , params , state ));
182+ } else {
183+ shardFollowNodeTask .markAsFailed (e );
184+ }
185+ };
186+
187+ fetchGlobalCheckpoint (followerClient , params .getFollowShardId (), handler , errorHandler );
170188 }
171189
172190 private void fetchGlobalCheckpoint (
@@ -176,6 +194,11 @@ private void fetchGlobalCheckpoint(
176194 final Consumer <Exception > errorHandler ) {
177195 client .admin ().indices ().stats (new IndicesStatsRequest ().indices (shardId .getIndexName ()), ActionListener .wrap (r -> {
178196 IndexStats indexStats = r .getIndex (shardId .getIndexName ());
197+ if (indexStats == null ) {
198+ errorHandler .accept (new IndexNotFoundException (shardId .getIndex ()));
199+ return ;
200+ }
201+
179202 Optional <ShardStats > filteredShardStats = Arrays .stream (indexStats .getShards ())
180203 .filter (shardStats -> shardStats .getShardRouting ().shardId ().equals (shardId ))
181204 .filter (shardStats -> shardStats .getShardRouting ().primary ())
@@ -186,7 +209,7 @@ private void fetchGlobalCheckpoint(
186209 final long maxSeqNo = seqNoStats .getMaxSeqNo ();
187210 handler .accept (globalCheckpoint , maxSeqNo );
188211 } else {
189- errorHandler .accept (new IllegalArgumentException ( "Cannot find shard stats for shard " + shardId ));
212+ errorHandler .accept (new ShardNotFoundException ( shardId ));
190213 }
191214 }, errorHandler ));
192215 }
0 commit comments