1010import org .elasticsearch .ElasticsearchException ;
1111import org .elasticsearch .action .Action ;
1212import org .elasticsearch .action .ActionListener ;
13- import org .elasticsearch .action .admin .cluster .state .ClusterStateRequest ;
14- import org .elasticsearch .action .admin .indices .mapping .put .PutMappingRequest ;
1513import org .elasticsearch .action .ActionRequest ;
1614import org .elasticsearch .action .ActionRequestBuilder ;
1715import org .elasticsearch .action .ActionResponse ;
16+ import org .elasticsearch .action .NoShardAvailableActionException ;
17+ import org .elasticsearch .action .UnavailableShardsException ;
18+ import org .elasticsearch .action .admin .cluster .state .ClusterStateRequest ;
19+ import org .elasticsearch .action .admin .indices .mapping .put .PutMappingRequest ;
1820import org .elasticsearch .action .admin .indices .stats .IndexStats ;
1921import org .elasticsearch .action .admin .indices .stats .IndicesStatsRequest ;
2022import org .elasticsearch .action .admin .indices .stats .ShardStats ;
3133import org .elasticsearch .common .unit .TimeValue ;
3234import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
3335import org .elasticsearch .common .util .concurrent .CountDown ;
36+ import org .elasticsearch .common .util .concurrent .ThreadContext ;
3437import org .elasticsearch .common .xcontent .XContentType ;
3538import org .elasticsearch .index .Index ;
36- import org .elasticsearch .common .util .concurrent .ThreadContext ;
3739import org .elasticsearch .index .shard .ShardId ;
3840import org .elasticsearch .index .translog .Translog ;
41+ import org .elasticsearch .node .NodeClosedException ;
3942import org .elasticsearch .persistent .AllocatedPersistentTask ;
4043import org .elasticsearch .persistent .PersistentTasksCustomMetaData ;
4144import org .elasticsearch .persistent .PersistentTasksExecutor ;
4245import org .elasticsearch .tasks .Task ;
4346import org .elasticsearch .tasks .TaskId ;
4447import org .elasticsearch .threadpool .ThreadPool ;
48+ import org .elasticsearch .transport .ActionTransportException ;
4549import org .elasticsearch .xpack .ccr .Ccr ;
4650import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsAction ;
4751import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsRequest ;
5862import java .util .concurrent .atomic .AtomicLong ;
5963import java .util .concurrent .atomic .AtomicReference ;
6064import java .util .function .BiConsumer ;
65+ import java .util .function .BooleanSupplier ;
6166import java .util .function .Consumer ;
6267import java .util .function .LongConsumer ;
6368import java .util .function .Supplier ;
@@ -141,7 +146,8 @@ void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask tas
141146 // TODO: check if both indices have the same history uuid
142147 if (leaderGlobalCheckPoint == followGlobalCheckPoint ) {
143148 logger .debug ("{} no write operations to fetch" , followerShard );
144- retry (leaderClient , followerClient , task , params , followGlobalCheckPoint , imdVersionChecker );
149+ retry (() -> prepare (leaderClient , followerClient , task , params , followGlobalCheckPoint , imdVersionChecker ),
150+ task ::markAsFailed );
145151 } else {
146152 assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
147153 "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]" ;
@@ -156,34 +162,47 @@ void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask tas
156162 task .markAsFailed (e );
157163 }
158164 };
159- ChunksCoordinator coordinator = new ChunksCoordinator (followerClient , leaderClient , ccrExecutor , imdVersionChecker ,
160- params .getMaxChunkSize (), params .getNumConcurrentChunks (), params .getProcessorMaxTranslogBytes (), leaderShard ,
161- followerShard , handler );
165+ Consumer <Runnable > scheduler = scheduleTask -> retry (scheduleTask , handler );
166+ ChunksCoordinator coordinator = new ChunksCoordinator (followerClient , leaderClient , scheduler , ccrExecutor ,
167+ imdVersionChecker , params .getMaxChunkSize (), params .getNumConcurrentChunks (),
168+ params .getProcessorMaxTranslogBytes (), leaderShard , followerShard , handler , task ::isRunning );
162169 coordinator .createChucks (followGlobalCheckPoint , leaderGlobalCheckPoint );
163170 coordinator .start ();
164171 }
165172 }, task ::markAsFailed );
166173 }
167174
168- private void retry (Client leaderClient , Client followerClient , ShardFollowNodeTask task , ShardFollowTask params ,
169- long followGlobalCheckPoint ,
170- IndexMetadataVersionChecker imdVersionChecker ) {
175+ private void retry (Runnable task , Consumer <Exception > errorHandler ) {
171176 threadPool .schedule (RETRY_TIMEOUT , Ccr .CCR_THREAD_POOL_NAME , new AbstractRunnable () {
172177 @ Override
173178 public void onFailure (Exception e ) {
174- task . markAsFailed (e );
179+ errorHandler . accept (e );
175180 }
176181
177182 @ Override
178183 protected void doRun () throws Exception {
179- prepare ( leaderClient , followerClient , task , params , followGlobalCheckPoint , imdVersionChecker );
184+ task . run ( );
180185 }
181186 });
182187 }
183188
184189 private void fetchGlobalCheckpoint (Client client , ShardId shardId , LongConsumer handler , Consumer <Exception > errorHandler ) {
190+ fetchGlobalCheckpoint (client , shardId , handler , errorHandler , 0 );
191+ }
192+
193+ private void fetchGlobalCheckpoint (Client client , ShardId shardId , LongConsumer handler , Consumer <Exception > errorHandler ,
194+ int attempt ) {
185195 client .admin ().indices ().stats (new IndicesStatsRequest ().indices (shardId .getIndexName ()), ActionListener .wrap (r -> {
186196 IndexStats indexStats = r .getIndex (shardId .getIndexName ());
197+ if (indexStats == null ) {
198+ if (attempt <= 5 ) {
199+ retry (() -> fetchGlobalCheckpoint (client , shardId , handler , errorHandler , attempt + 1 ), errorHandler );
200+ } else {
201+ errorHandler .accept (new IllegalArgumentException ("Cannot find shard stats for shard " + shardId ));
202+ }
203+ return ;
204+ }
205+
187206 Optional <ShardStats > filteredShardStats = Arrays .stream (indexStats .getShards ())
188207 .filter (shardStats -> shardStats .getShardRouting ().shardId ().equals (shardId ))
189208 .filter (shardStats -> shardStats .getShardRouting ().primary ())
@@ -193,7 +212,11 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer
193212 final long globalCheckPoint = filteredShardStats .get ().getSeqNoStats ().getGlobalCheckpoint ();
194213 handler .accept (globalCheckPoint );
195214 } else {
196- errorHandler .accept (new IllegalArgumentException ("Cannot find shard stats for shard " + shardId ));
215+ if (attempt <= 5 ) {
216+ retry (() -> fetchGlobalCheckpoint (client , shardId , handler , errorHandler , attempt + 1 ), errorHandler );
217+ } else {
218+ errorHandler .accept (new IllegalArgumentException ("Cannot find shard stats for shard " + shardId ));
219+ }
197220 }
198221 }, errorHandler ));
199222 }
@@ -213,16 +236,28 @@ static class ChunksCoordinator {
213236 private final ShardId leaderShard ;
214237 private final ShardId followerShard ;
215238 private final Consumer <Exception > handler ;
239+ private final BooleanSupplier isRunning ;
240+ private final Consumer <Runnable > scheduler ;
216241
217242 private final CountDown countDown ;
218243 private final Queue <long []> chunks = new ConcurrentLinkedQueue <>();
219244 private final AtomicReference <Exception > failureHolder = new AtomicReference <>();
220245
221- ChunksCoordinator (Client followerClient , Client leaderClient , Executor ccrExecutor , IndexMetadataVersionChecker imdVersionChecker ,
222- long batchSize , int concurrentProcessors , long processorMaxTranslogBytes , ShardId leaderShard ,
223- ShardId followerShard , Consumer <Exception > handler ) {
246+ ChunksCoordinator (Client followerClient ,
247+ Client leaderClient ,
248+ Consumer <Runnable > scheduler ,
249+ Executor ccrExecutor ,
250+ IndexMetadataVersionChecker imdVersionChecker ,
251+ long batchSize ,
252+ int concurrentProcessors ,
253+ long processorMaxTranslogBytes ,
254+ ShardId leaderShard ,
255+ ShardId followerShard ,
256+ Consumer <Exception > handler ,
257+ BooleanSupplier isRunning ) {
224258 this .followerClient = followerClient ;
225259 this .leaderClient = leaderClient ;
260+ this .scheduler = scheduler ;
226261 this .ccrExecutor = ccrExecutor ;
227262 this .imdVersionChecker = imdVersionChecker ;
228263 this .batchSize = batchSize ;
@@ -231,6 +266,7 @@ static class ChunksCoordinator {
231266 this .leaderShard = leaderShard ;
232267 this .followerShard = followerShard ;
233268 this .handler = handler ;
269+ this .isRunning = isRunning ;
234270 this .countDown = new CountDown (concurrentProcessors );
235271 }
236272
@@ -285,8 +321,8 @@ void processNextChunk() {
285321 postProcessChuck (e );
286322 }
287323 };
288- ChunkProcessor processor = new ChunkProcessor (leaderClient , followerClient , chunks , ccrExecutor , imdVersionChecker ,
289- leaderShard , followerShard , processorHandler );
324+ ChunkProcessor processor = new ChunkProcessor (leaderClient , followerClient , scheduler , chunks , ccrExecutor ,
325+ imdVersionChecker , leaderShard , followerShard , processorHandler , isRunning );
290326 processor .start (chunk [0 ], chunk [1 ], processorMaxTranslogBytes );
291327 }
292328
@@ -313,23 +349,34 @@ static class ChunkProcessor {
313349 private final Queue <long []> chunks ;
314350 private final Executor ccrExecutor ;
315351 private final BiConsumer <Long , Consumer <Exception >> indexVersionChecker ;
352+ private final BooleanSupplier isRunning ;
353+ private final Consumer <Runnable > scheduler ;
316354
317355 private final ShardId leaderShard ;
318356 private final ShardId followerShard ;
319357 private final Consumer <Exception > handler ;
320358 final AtomicInteger retryCounter = new AtomicInteger (0 );
321359
322- ChunkProcessor (Client leaderClient , Client followerClient , Queue <long []> chunks , Executor ccrExecutor ,
360+ ChunkProcessor (Client leaderClient ,
361+ Client followerClient ,
362+ Consumer <Runnable > scheduler ,
363+ Queue <long []> chunks ,
364+ Executor ccrExecutor ,
323365 BiConsumer <Long , Consumer <Exception >> indexVersionChecker ,
324- ShardId leaderShard , ShardId followerShard , Consumer <Exception > handler ) {
366+ ShardId leaderShard ,
367+ ShardId followerShard ,
368+ Consumer <Exception > handler ,
369+ BooleanSupplier isRunning ) {
325370 this .leaderClient = leaderClient ;
326371 this .followerClient = followerClient ;
372+ this .scheduler = scheduler ;
327373 this .chunks = chunks ;
328374 this .ccrExecutor = ccrExecutor ;
329375 this .indexVersionChecker = indexVersionChecker ;
330376 this .leaderShard = leaderShard ;
331377 this .followerShard = followerShard ;
332378 this .handler = handler ;
379+ this .isRunning = isRunning ;
333380 }
334381
335382 void start (final long from , final long to , final long maxTranslogsBytes ) {
@@ -349,8 +396,8 @@ public void onResponse(ShardChangesAction.Response response) {
349396 public void onFailure (Exception e ) {
350397 assert e != null ;
351398 if (shouldRetry (e )) {
352- if (retryCounter . incrementAndGet () <= PROCESSOR_RETRY_LIMIT ) {
353- start (from , to , maxTranslogsBytes );
399+ if (canRetry () ) {
400+ scheduler . accept (() -> start (from , to , maxTranslogsBytes ) );
354401 } else {
355402 handler .accept (new ElasticsearchException ("retrying failed [" + retryCounter .get () +
356403 "] times, aborting..." , e ));
@@ -382,11 +429,15 @@ public void onFailure(Exception e) {
382429 protected void doRun () throws Exception {
383430 indexVersionChecker .accept (response .getIndexMetadataVersion (), e -> {
384431 if (e != null ) {
385- if (shouldRetry (e ) && retryCounter .incrementAndGet () <= PROCESSOR_RETRY_LIMIT ) {
386- handleResponse (to , response );
387- } else {
388- handler .accept (new ElasticsearchException ("retrying failed [" + retryCounter .get () +
432+ if (shouldRetry (e )) {
433+ if (canRetry ()) {
434+ scheduler .accept (() -> handleResponse (to , response ));
435+ } else {
436+ handler .accept (new ElasticsearchException ("retrying failed [" + retryCounter .get () +
389437 "] times, aborting..." , e ));
438+ }
439+ } else {
440+ handler .accept (e );
390441 }
391442 return ;
392443 }
@@ -400,10 +451,17 @@ public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResp
400451
401452 @ Override
402453 public void onFailure (final Exception e ) {
403- // No retry mechanism here, because if a failure is being redirected to this place it is considered
404- // non recoverable.
405454 assert e != null ;
406- handler .accept (e );
455+ if (shouldRetry (e )) {
456+ if (canRetry ()) {
457+ scheduler .accept (() -> handleResponse (to , response ));
458+ } else {
459+ handler .accept (new ElasticsearchException ("retrying failed [" + retryCounter .get () +
460+ "] times, aborting..." , e ));
461+ }
462+ } else {
463+ handler .accept (e );
464+ }
407465 }
408466 }
409467 );
@@ -415,7 +473,15 @@ public void onFailure(final Exception e) {
415473 boolean shouldRetry (Exception e ) {
416474 // TODO: What other exceptions should be retried?
417475 return NetworkExceptionHelper .isConnectException (e ) ||
418- NetworkExceptionHelper .isCloseConnectionException (e );
476+ NetworkExceptionHelper .isCloseConnectionException (e ) ||
477+ e instanceof ActionTransportException ||
478+ e instanceof NodeClosedException ||
479+ e instanceof UnavailableShardsException ||
480+ e instanceof NoShardAvailableActionException ;
481+ }
482+
483+ boolean canRetry () {
484+ return isRunning .getAsBoolean () && retryCounter .incrementAndGet () <= PROCESSOR_RETRY_LIMIT ;
419485 }
420486
421487 }
0 commit comments