Skip to content

Commit 48e7371

Browse files
committed
iter
1 parent b09a839 commit 48e7371

File tree

1 file changed

+25
-38
lines changed

1 file changed

+25
-38
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer
212212
final long globalCheckPoint = filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint();
213213
handler.accept(globalCheckPoint);
214214
} else {
215-
if (attempt <= 5) {
215+
if (attempt <= PROCESSOR_RETRY_LIMIT) {
216216
retry(() -> fetchGlobalCheckpoint(client, shardId, handler, errorHandler, attempt + 1), errorHandler);
217217
} else {
218218
errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId));
@@ -344,6 +344,8 @@ Queue<long[]> getChunks() {
344344

345345
static class ChunkProcessor {
346346

347+
private static final Logger LOGGER = Loggers.getLogger(ChunkProcessor.class);
348+
347349
private final Client leaderClient;
348350
private final Client followerClient;
349351
private final Queue<long[]> chunks;
@@ -394,17 +396,7 @@ public void onResponse(ShardChangesAction.Response response) {
394396

395397
@Override
396398
public void onFailure(Exception e) {
397-
assert e != null;
398-
if (shouldRetry(e)) {
399-
if (canRetry()) {
400-
scheduler.accept(() -> start(from, to, maxTranslogsBytes));
401-
} else {
402-
handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() +
403-
"] times, aborting...", e));
404-
}
405-
} else {
406-
handler.accept(e);
407-
}
399+
retryOrFail(e, () -> start(from, to, maxTranslogsBytes));
408400
}
409401
});
410402
}
@@ -429,39 +421,20 @@ public void onFailure(Exception e) {
429421
protected void doRun() throws Exception {
430422
indexVersionChecker.accept(response.getIndexMetadataVersion(), e -> {
431423
if (e != null) {
432-
if (shouldRetry(e)) {
433-
if (canRetry()) {
434-
scheduler.accept(() -> handleResponse(to, response));
435-
} else {
436-
handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() +
437-
"] times, aborting...", e));
438-
}
439-
} else {
440-
handler.accept(e);
441-
}
424+
retryOrFail(e, () -> handleResponse(to, response));
442425
return;
443426
}
444427
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
445428
followerClient.execute(BulkShardOperationsAction.INSTANCE, request,
446429
new ActionListener<BulkShardOperationsResponse>() {
447-
@Override
448-
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
449-
handler.accept(null);
450-
}
430+
@Override
431+
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
432+
handler.accept(null);
433+
}
451434

452435
@Override
453436
public void onFailure(final Exception e) {
454-
assert e != null;
455-
if (shouldRetry(e)) {
456-
if (canRetry()) {
457-
scheduler.accept(() -> handleResponse(to, response));
458-
} else {
459-
handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() +
460-
"] times, aborting...", e));
461-
}
462-
} else {
463-
handler.accept(e);
464-
}
437+
retryOrFail(e, () -> handleResponse(to, response));
465438
}
466439
}
467440
);
@@ -470,6 +443,20 @@ public void onFailure(final Exception e) {
470443
});
471444
}
472445

446+
void retryOrFail(Exception e, Runnable retryAction) {
447+
assert e != null;
448+
if (shouldRetry(e)) {
449+
if (canRetry()) {
450+
LOGGER.debug(() -> new ParameterizedMessage("{} Retrying [{}]...", leaderShard, retryCounter.get()), e);
451+
scheduler.accept(retryAction);
452+
} else {
453+
handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + "] times, aborting...", e));
454+
}
455+
} else {
456+
handler.accept(e);
457+
}
458+
}
459+
473460
boolean shouldRetry(Exception e) {
474461
// TODO: What other exceptions should be retried?
475462
return NetworkExceptionHelper.isConnectException(e) ||
@@ -537,7 +524,7 @@ static final class IndexMetadataVersionChecker implements BiConsumer<Long, Consu
537524

538525
public void accept(Long minimumRequiredIndexMetadataVersion, Consumer<Exception> handler) {
539526
if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) {
540-
LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
527+
LOGGER.trace("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
541528
currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion);
542529
handler.accept(null);
543530
} else {

0 commit comments

Comments
 (0)