Skip to content

Commit a717f44

Browse files
committed
[CCR] Improve shard follow task's retryable error handling
Improve failure handling of retryable errors by retrying remote calls in a exponential backoff like manner. The delay between a retry would not be longer than the configured max retry delay. Also retryable errors will be retried indefinitely. Relates to #30086
1 parent 7f7e8fd commit a717f44

File tree

4 files changed

+61
-152
lines changed

4 files changed

+61
-152
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ public static class Request extends ActionRequest implements ToXContentObject {
9898
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES);
9999
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_WRITE_BUFFER_SIZE);
100100
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
101-
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.RETRY_TIMEOUT.getPreferredName()),
102-
ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
101+
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.MAX_RETRY_DELAY.getPreferredName()),
102+
ShardFollowTask.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
103103
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
104104
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName()),
105105
ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
@@ -126,7 +126,7 @@ public static Request fromXContent(XContentParser parser, String followerIndex)
126126
private long maxOperationSizeInBytes;
127127
private int maxConcurrentWriteBatches;
128128
private int maxWriteBufferSize;
129-
private TimeValue retryTimeout;
129+
private TimeValue maxRetryDelay;
130130
private TimeValue idleShardRetryDelay;
131131

132132
public Request(
@@ -137,7 +137,7 @@ public Request(
137137
Long maxOperationSizeInBytes,
138138
Integer maxConcurrentWriteBatches,
139139
Integer maxWriteBufferSize,
140-
TimeValue retryTimeout,
140+
TimeValue maxRetryDelay,
141141
TimeValue idleShardRetryDelay) {
142142

143143
if (leaderIndex == null) {
@@ -161,8 +161,8 @@ public Request(
161161
if (maxWriteBufferSize == null) {
162162
maxWriteBufferSize = ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE;
163163
}
164-
if (retryTimeout == null) {
165-
retryTimeout = ShardFollowNodeTask.DEFAULT_RETRY_TIMEOUT;
164+
if (maxRetryDelay == null) {
165+
maxRetryDelay = ShardFollowNodeTask.DEFAULT_MAX_RETRY_DELAY;
166166
}
167167
if (idleShardRetryDelay == null) {
168168
idleShardRetryDelay = ShardFollowNodeTask.DEFAULT_IDLE_SHARD_RETRY_DELAY;
@@ -191,7 +191,7 @@ public Request(
191191
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
192192
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
193193
this.maxWriteBufferSize = maxWriteBufferSize;
194-
this.retryTimeout = retryTimeout;
194+
this.maxRetryDelay = maxRetryDelay;
195195
this.idleShardRetryDelay = idleShardRetryDelay;
196196
}
197197

@@ -225,7 +225,7 @@ public void readFrom(StreamInput in) throws IOException {
225225
maxOperationSizeInBytes = in.readVLong();
226226
maxConcurrentWriteBatches = in.readVInt();
227227
maxWriteBufferSize = in.readVInt();
228-
retryTimeout = in.readOptionalTimeValue();
228+
maxRetryDelay = in.readOptionalTimeValue();
229229
idleShardRetryDelay = in.readOptionalTimeValue();
230230
}
231231

@@ -239,7 +239,7 @@ public void writeTo(StreamOutput out) throws IOException {
239239
out.writeVLong(maxOperationSizeInBytes);
240240
out.writeVInt(maxConcurrentWriteBatches);
241241
out.writeVInt(maxWriteBufferSize);
242-
out.writeOptionalTimeValue(retryTimeout);
242+
out.writeOptionalTimeValue(maxRetryDelay);
243243
out.writeOptionalTimeValue(idleShardRetryDelay);
244244
}
245245

@@ -254,7 +254,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
254254
builder.field(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
255255
builder.field(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
256256
builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
257-
builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
257+
builder.field(ShardFollowTask.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
258258
builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
259259
}
260260
builder.endObject();
@@ -271,7 +271,7 @@ public boolean equals(Object o) {
271271
maxOperationSizeInBytes == request.maxOperationSizeInBytes &&
272272
maxConcurrentWriteBatches == request.maxConcurrentWriteBatches &&
273273
maxWriteBufferSize == request.maxWriteBufferSize &&
274-
Objects.equals(retryTimeout, request.retryTimeout) &&
274+
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
275275
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) &&
276276
Objects.equals(leaderIndex, request.leaderIndex) &&
277277
Objects.equals(followerIndex, request.followerIndex);
@@ -287,7 +287,7 @@ public int hashCode() {
287287
maxOperationSizeInBytes,
288288
maxConcurrentWriteBatches,
289289
maxWriteBufferSize,
290-
retryTimeout,
290+
maxRetryDelay,
291291
idleShardRetryDelay
292292
);
293293
}
@@ -412,7 +412,7 @@ void start(
412412
new ShardId(followIndexMetadata.getIndex(), shardId),
413413
new ShardId(leaderIndexMetadata.getIndex(), shardId),
414414
request.maxBatchOperationCount, request.maxConcurrentReadBatches, request.maxOperationSizeInBytes,
415-
request.maxConcurrentWriteBatches, request.maxWriteBufferSize, request.retryTimeout,
415+
request.maxConcurrentWriteBatches, request.maxWriteBufferSize, request.maxRetryDelay,
416416
request.idleShardRetryDelay, filteredHeaders);
417417
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
418418
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
6161
public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1;
6262
public static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240;
6363
public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE;
64-
private static final int RETRY_LIMIT = 10;
65-
public static final TimeValue DEFAULT_RETRY_TIMEOUT = new TimeValue(500);
64+
public static final TimeValue DEFAULT_MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5);
6665
public static final TimeValue DEFAULT_IDLE_SHARD_RETRY_DELAY = TimeValue.timeValueSeconds(10);
6766

6867
private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);
6968

7069
private final String leaderIndex;
7170
private final ShardFollowTask params;
72-
private final TimeValue retryTimeout;
71+
private final TimeValue maxRetryDelay;
7372
private final TimeValue idleShardChangesRequestDelay;
7473
private final BiConsumer<TimeValue, Runnable> scheduler;
7574
private final LongSupplier relativeTimeProvider;
@@ -101,7 +100,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
101100
this.params = params;
102101
this.scheduler = scheduler;
103102
this.relativeTimeProvider = relativeTimeProvider;
104-
this.retryTimeout = params.getRetryTimeout();
103+
this.maxRetryDelay = params.getMaxRetryDelay();
105104
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
106105
/*
107106
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
@@ -379,20 +378,22 @@ private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {
379378

380379
private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
381380
assert e != null;
382-
if (shouldRetry(e)) {
383-
if (isStopped() == false && retryCounter.incrementAndGet() <= RETRY_LIMIT) {
384-
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying...", params.getFollowShardId()), e);
385-
scheduler.accept(retryTimeout, task);
386-
} else {
387-
markAsFailed(new ElasticsearchException("retrying failed [" + retryCounter.get() +
388-
"] times, aborting...", e));
389-
}
381+
if (shouldRetry(e) && isStopped() == false) {
382+
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying...", params.getFollowShardId()), e);
383+
int currentRetry = retryCounter.incrementAndGet();
384+
long delay = computeDelay(currentRetry, maxRetryDelay.getMillis());
385+
scheduler.accept(TimeValue.timeValueMillis(delay), task);
390386
} else {
391387
markAsFailed(e);
392388
}
393389
}
394390

395-
private boolean shouldRetry(Exception e) {
391+
static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
392+
long expectedBackOff = Math.round(10 * Math.exp(0.8d * currentRetry) - 1);
393+
return Math.min(expectedBackOff, maxRetryDelayInMillis);
394+
}
395+
396+
private static boolean shouldRetry(Exception e) {
396397
return NetworkExceptionHelper.isConnectException(e) ||
397398
NetworkExceptionHelper.isCloseConnectionException(e) ||
398399
TransportActions.isShardNotAvailableException(e);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
4848
public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes");
4949
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
5050
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
51-
public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout");
51+
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
5252
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
5353

5454
@SuppressWarnings("unchecked")
@@ -71,8 +71,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
7171
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
7272
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE);
7373
PARSER.declareField(ConstructingObjectParser.constructorArg(),
74-
(p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()),
75-
RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
74+
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
75+
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
7676
PARSER.declareField(ConstructingObjectParser.constructorArg(),
7777
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
7878
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
@@ -87,13 +87,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
8787
private final long maxBatchSizeInBytes;
8888
private final int maxConcurrentWriteBatches;
8989
private final int maxWriteBufferSize;
90-
private final TimeValue retryTimeout;
90+
private final TimeValue maxRetryDelay;
9191
private final TimeValue idleShardRetryDelay;
9292
private final Map<String, String> headers;
9393

9494
ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount,
9595
int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches,
96-
int maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay, Map<String, String> headers) {
96+
int maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay, Map<String, String> headers) {
9797
this.leaderClusterAlias = leaderClusterAlias;
9898
this.followShardId = followShardId;
9999
this.leaderShardId = leaderShardId;
@@ -102,7 +102,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
102102
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
103103
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
104104
this.maxWriteBufferSize = maxWriteBufferSize;
105-
this.retryTimeout = retryTimeout;
105+
this.maxRetryDelay = maxRetryDelay;
106106
this.idleShardRetryDelay = idleShardRetryDelay;
107107
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
108108
}
@@ -116,7 +116,7 @@ public ShardFollowTask(StreamInput in) throws IOException {
116116
this.maxBatchSizeInBytes = in.readVLong();
117117
this.maxConcurrentWriteBatches = in.readVInt();
118118
this.maxWriteBufferSize = in.readVInt();
119-
this.retryTimeout = in.readTimeValue();
119+
this.maxRetryDelay = in.readTimeValue();
120120
this.idleShardRetryDelay = in.readTimeValue();
121121
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
122122
}
@@ -153,8 +153,8 @@ public long getMaxBatchSizeInBytes() {
153153
return maxBatchSizeInBytes;
154154
}
155155

156-
public TimeValue getRetryTimeout() {
157-
return retryTimeout;
156+
public TimeValue getMaxRetryDelay() {
157+
return maxRetryDelay;
158158
}
159159

160160
public TimeValue getIdleShardRetryDelay() {
@@ -184,7 +184,7 @@ public void writeTo(StreamOutput out) throws IOException {
184184
out.writeVLong(maxBatchSizeInBytes);
185185
out.writeVInt(maxConcurrentWriteBatches);
186186
out.writeVInt(maxWriteBufferSize);
187-
out.writeTimeValue(retryTimeout);
187+
out.writeTimeValue(maxRetryDelay);
188188
out.writeTimeValue(idleShardRetryDelay);
189189
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
190190
}
@@ -210,7 +210,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
210210
builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxBatchSizeInBytes);
211211
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
212212
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
213-
builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
213+
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
214214
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
215215
builder.field(HEADERS.getPreferredName(), headers);
216216
return builder.endObject();
@@ -229,15 +229,15 @@ public boolean equals(Object o) {
229229
maxConcurrentWriteBatches == that.maxConcurrentWriteBatches &&
230230
maxBatchSizeInBytes == that.maxBatchSizeInBytes &&
231231
maxWriteBufferSize == that.maxWriteBufferSize &&
232-
Objects.equals(retryTimeout, that.retryTimeout) &&
232+
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
233233
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) &&
234234
Objects.equals(headers, that.headers);
235235
}
236236

237237
@Override
238238
public int hashCode() {
239239
return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches,
240-
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, retryTimeout, idleShardRetryDelay, headers);
240+
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, idleShardRetryDelay, headers);
241241
}
242242

243243
public String toString() {

0 commit comments

Comments
 (0)