Skip to content

Commit 96c49e5

Browse files
authored
[CCR] Improve shard follow task's retryable error handling (#33371)
Improve failure handling of retryable errors by retrying remote calls in a exponential backoff like manner. The delay between a retry would not be longer than the configured max retry delay. Also retryable errors will be retried indefinitely. Relates to #30086
1 parent c92ec1c commit 96c49e5

File tree

9 files changed

+92
-172
lines changed

9 files changed

+92
-172
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ public static class Request extends AcknowledgedRequest<Request> implements ToXC
5656
PARSER.declareLong(Request::setMaxOperationSizeInBytes, AutoFollowPattern.MAX_BATCH_SIZE_IN_BYTES);
5757
PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES);
5858
PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE);
59-
PARSER.declareField(Request::setRetryTimeout,
59+
PARSER.declareField(Request::setMaxRetryDelay,
6060
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.RETRY_TIMEOUT.getPreferredName()),
61-
ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
61+
ShardFollowTask.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
6262
PARSER.declareField(Request::setIdleShardRetryDelay,
6363
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.IDLE_SHARD_RETRY_DELAY.getPreferredName()),
6464
ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
@@ -87,7 +87,7 @@ public static Request fromXContent(XContentParser parser, String remoteClusterAl
8787
private Long maxOperationSizeInBytes;
8888
private Integer maxConcurrentWriteBatches;
8989
private Integer maxWriteBufferSize;
90-
private TimeValue retryTimeout;
90+
private TimeValue maxRetryDelay;
9191
private TimeValue idleShardRetryDelay;
9292

9393
@Override
@@ -166,12 +166,12 @@ public void setMaxWriteBufferSize(Integer maxWriteBufferSize) {
166166
this.maxWriteBufferSize = maxWriteBufferSize;
167167
}
168168

169-
public TimeValue getRetryTimeout() {
170-
return retryTimeout;
169+
public TimeValue getMaxRetryDelay() {
170+
return maxRetryDelay;
171171
}
172172

173-
public void setRetryTimeout(TimeValue retryTimeout) {
174-
this.retryTimeout = retryTimeout;
173+
public void setMaxRetryDelay(TimeValue maxRetryDelay) {
174+
this.maxRetryDelay = maxRetryDelay;
175175
}
176176

177177
public TimeValue getIdleShardRetryDelay() {
@@ -193,7 +193,7 @@ public void readFrom(StreamInput in) throws IOException {
193193
maxOperationSizeInBytes = in.readOptionalLong();
194194
maxConcurrentWriteBatches = in.readOptionalVInt();
195195
maxWriteBufferSize = in.readOptionalVInt();
196-
retryTimeout = in.readOptionalTimeValue();
196+
maxRetryDelay = in.readOptionalTimeValue();
197197
idleShardRetryDelay = in.readOptionalTimeValue();
198198
}
199199

@@ -208,7 +208,7 @@ public void writeTo(StreamOutput out) throws IOException {
208208
out.writeOptionalLong(maxOperationSizeInBytes);
209209
out.writeOptionalVInt(maxConcurrentWriteBatches);
210210
out.writeOptionalVInt(maxWriteBufferSize);
211-
out.writeOptionalTimeValue(retryTimeout);
211+
out.writeOptionalTimeValue(maxRetryDelay);
212212
out.writeOptionalTimeValue(idleShardRetryDelay);
213213
}
214214

@@ -236,8 +236,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
236236
if (maxConcurrentWriteBatches != null) {
237237
builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
238238
}
239-
if (retryTimeout != null) {
240-
builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
239+
if (maxRetryDelay != null) {
240+
builder.field(ShardFollowTask.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
241241
}
242242
if (idleShardRetryDelay != null) {
243243
builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
@@ -260,7 +260,7 @@ public boolean equals(Object o) {
260260
Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) &&
261261
Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) &&
262262
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
263-
Objects.equals(retryTimeout, request.retryTimeout) &&
263+
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
264264
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay);
265265
}
266266

@@ -275,7 +275,7 @@ public int hashCode() {
275275
maxOperationSizeInBytes,
276276
maxConcurrentWriteBatches,
277277
maxWriteBufferSize,
278-
retryTimeout,
278+
maxRetryDelay,
279279
idleShardRetryDelay
280280
);
281281
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.logging.log4j.message.ParameterizedMessage;
1111
import org.elasticsearch.ElasticsearchException;
1212
import org.elasticsearch.action.support.TransportActions;
13+
import org.elasticsearch.common.Randomness;
1314
import org.elasticsearch.common.logging.Loggers;
1415
import org.elasticsearch.common.transport.NetworkExceptionHelper;
1516
import org.elasticsearch.common.unit.TimeValue;
@@ -18,7 +19,6 @@
1819
import org.elasticsearch.persistent.AllocatedPersistentTask;
1920
import org.elasticsearch.tasks.TaskId;
2021
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
21-
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
2222
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
2323

2424
import java.util.ArrayList;
@@ -43,11 +43,12 @@
4343
*/
4444
public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
4545

46+
private static final int DELAY_MILLIS = 50;
4647
private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);
4748

4849
private final String leaderIndex;
4950
private final ShardFollowTask params;
50-
private final TimeValue retryTimeout;
51+
private final TimeValue maxRetryDelay;
5152
private final TimeValue idleShardChangesRequestDelay;
5253
private final BiConsumer<TimeValue, Runnable> scheduler;
5354
private final LongSupplier relativeTimeProvider;
@@ -79,7 +80,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
7980
this.params = params;
8081
this.scheduler = scheduler;
8182
this.relativeTimeProvider = relativeTimeProvider;
82-
this.retryTimeout = params.getRetryTimeout();
83+
this.maxRetryDelay = params.getMaxRetryDelay();
8384
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
8485
/*
8586
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
@@ -357,20 +358,28 @@ private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {
357358

358359
private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
359360
assert e != null;
360-
if (shouldRetry(e)) {
361-
if (isStopped() == false && retryCounter.incrementAndGet() <= FollowIndexAction.RETRY_LIMIT) {
362-
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying...", params.getFollowShardId()), e);
363-
scheduler.accept(retryTimeout, task);
364-
} else {
365-
markAsFailed(new ElasticsearchException("retrying failed [" + retryCounter.get() +
366-
"] times, aborting...", e));
367-
}
361+
if (shouldRetry(e) && isStopped() == false) {
362+
int currentRetry = retryCounter.incrementAndGet();
363+
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]",
364+
params.getFollowShardId(), currentRetry), e);
365+
long delay = computeDelay(currentRetry, maxRetryDelay.getMillis());
366+
scheduler.accept(TimeValue.timeValueMillis(delay), task);
368367
} else {
369368
markAsFailed(e);
370369
}
371370
}
372371

373-
private boolean shouldRetry(Exception e) {
372+
static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
373+
// Cap currentRetry to avoid overflow when computing n variable
374+
int maxCurrentRetry = Math.min(currentRetry, 24);
375+
long n = Math.round(Math.pow(2, maxCurrentRetry - 1));
376+
// + 1 here, because nextInt(...) bound is exclusive and otherwise the first delay would always be zero.
377+
int k = Randomness.get().nextInt(Math.toIntExact(n + 1));
378+
int backOffDelay = k * DELAY_MILLIS;
379+
return Math.min(backOffDelay, maxRetryDelayInMillis);
380+
}
381+
382+
private static boolean shouldRetry(Exception e) {
374383
return NetworkExceptionHelper.isConnectException(e) ||
375384
NetworkExceptionHelper.isCloseConnectionException(e) ||
376385
TransportActions.isShardNotAvailableException(e);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
4848
public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes");
4949
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
5050
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
51-
public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout");
51+
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
5252
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
5353

5454
@SuppressWarnings("unchecked")
@@ -71,8 +71,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
7171
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
7272
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE);
7373
PARSER.declareField(ConstructingObjectParser.constructorArg(),
74-
(p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()),
75-
RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
74+
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
75+
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
7676
PARSER.declareField(ConstructingObjectParser.constructorArg(),
7777
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
7878
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
@@ -87,13 +87,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
8787
private final long maxBatchSizeInBytes;
8888
private final int maxConcurrentWriteBatches;
8989
private final int maxWriteBufferSize;
90-
private final TimeValue retryTimeout;
90+
private final TimeValue maxRetryDelay;
9191
private final TimeValue idleShardRetryDelay;
9292
private final Map<String, String> headers;
9393

9494
ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount,
9595
int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches,
96-
int maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay, Map<String, String> headers) {
96+
int maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay, Map<String, String> headers) {
9797
this.leaderClusterAlias = leaderClusterAlias;
9898
this.followShardId = followShardId;
9999
this.leaderShardId = leaderShardId;
@@ -102,7 +102,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
102102
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
103103
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
104104
this.maxWriteBufferSize = maxWriteBufferSize;
105-
this.retryTimeout = retryTimeout;
105+
this.maxRetryDelay = maxRetryDelay;
106106
this.idleShardRetryDelay = idleShardRetryDelay;
107107
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
108108
}
@@ -116,7 +116,7 @@ public ShardFollowTask(StreamInput in) throws IOException {
116116
this.maxBatchSizeInBytes = in.readVLong();
117117
this.maxConcurrentWriteBatches = in.readVInt();
118118
this.maxWriteBufferSize = in.readVInt();
119-
this.retryTimeout = in.readTimeValue();
119+
this.maxRetryDelay = in.readTimeValue();
120120
this.idleShardRetryDelay = in.readTimeValue();
121121
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
122122
}
@@ -153,8 +153,8 @@ public long getMaxBatchSizeInBytes() {
153153
return maxBatchSizeInBytes;
154154
}
155155

156-
public TimeValue getRetryTimeout() {
157-
return retryTimeout;
156+
public TimeValue getMaxRetryDelay() {
157+
return maxRetryDelay;
158158
}
159159

160160
public TimeValue getIdleShardRetryDelay() {
@@ -184,7 +184,7 @@ public void writeTo(StreamOutput out) throws IOException {
184184
out.writeVLong(maxBatchSizeInBytes);
185185
out.writeVInt(maxConcurrentWriteBatches);
186186
out.writeVInt(maxWriteBufferSize);
187-
out.writeTimeValue(retryTimeout);
187+
out.writeTimeValue(maxRetryDelay);
188188
out.writeTimeValue(idleShardRetryDelay);
189189
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
190190
}
@@ -210,7 +210,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
210210
builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxBatchSizeInBytes);
211211
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
212212
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
213-
builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
213+
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
214214
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
215215
builder.field(HEADERS.getPreferredName(), headers);
216216
return builder.endObject();
@@ -229,15 +229,15 @@ public boolean equals(Object o) {
229229
maxConcurrentWriteBatches == that.maxConcurrentWriteBatches &&
230230
maxBatchSizeInBytes == that.maxBatchSizeInBytes &&
231231
maxWriteBufferSize == that.maxWriteBufferSize &&
232-
Objects.equals(retryTimeout, that.retryTimeout) &&
232+
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
233233
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) &&
234234
Objects.equals(headers, that.headers);
235235
}
236236

237237
@Override
238238
public int hashCode() {
239239
return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches,
240-
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, retryTimeout, idleShardRetryDelay, headers);
240+
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, idleShardRetryDelay, headers);
241241
}
242242

243243
public String toString() {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ void start(
175175
request.getMaxOperationSizeInBytes(),
176176
request.getMaxConcurrentWriteBatches(),
177177
request.getMaxWriteBufferSize(),
178-
request.getRetryTimeout(),
178+
request.getMaxRetryDelay(),
179179
request.getIdleShardRetryDelay(),
180180
filteredHeaders);
181181
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
149149
request.getMaxOperationSizeInBytes(),
150150
request.getMaxConcurrentWriteBatches(),
151151
request.getMaxWriteBufferSize(),
152-
request.getRetryTimeout(),
152+
request.getMaxRetryDelay(),
153153
request.getIdleShardRetryDelay()
154154
);
155155
patterns.put(request.getLeaderClusterAlias(), autoFollowPattern);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void testAutoFollowParameterAreDelegated() throws Exception {
131131
request.setMaxOperationSizeInBytes(randomNonNegativeLong());
132132
}
133133
if (randomBoolean()) {
134-
request.setRetryTimeout(TimeValue.timeValueMillis(500));
134+
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
135135
}
136136
if (randomBoolean()) {
137137
request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500));
@@ -162,8 +162,8 @@ public void testAutoFollowParameterAreDelegated() throws Exception {
162162
if (request.getMaxOperationSizeInBytes() != null) {
163163
assertThat(shardFollowTask.getMaxBatchSizeInBytes(), equalTo(request.getMaxOperationSizeInBytes()));
164164
}
165-
if (request.getRetryTimeout() != null) {
166-
assertThat(shardFollowTask.getRetryTimeout(), equalTo(request.getRetryTimeout()));
165+
if (request.getMaxRetryDelay() != null) {
166+
assertThat(shardFollowTask.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay()));
167167
}
168168
if (request.getIdleShardRetryDelay() != null) {
169169
assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay()));

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ protected PutAutoFollowPatternAction.Request createTestInstance() {
4141
request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500));
4242
}
4343
if (randomBoolean()) {
44-
request.setRetryTimeout(TimeValue.timeValueMillis(500));
44+
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
4545
}
4646
if (randomBoolean()) {
4747
request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE));

0 commit comments

Comments
 (0)