Skip to content

Commit c8905da

Browse files
committed
Prepare ShardFollowNodeTask to bootstrap when it fall behind leader shard (#37562)
* Changed `LuceneSnapshot` to throw an `OperationsMissingException` if the requested ops are missing. * Changed the shard changes api to handle the `OperationsMissingException` and wrap the exception into `ResourceNotFound` exception and include metadata to indicate the requested range can no longer be retrieved. * Changed `ShardFollowNodeTask` to handle this `ResourceNotFound` exception with the included metdata header. Relates to #35975
1 parent 14adf8a commit c8905da

File tree

6 files changed

+105
-24
lines changed

6 files changed

+105
-24
lines changed

server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,13 @@ public Translog.Operation next() throws IOException {
147147
private void rangeCheck(Translog.Operation op) {
148148
if (op == null) {
149149
if (lastSeenSeqNo < toSeqNo) {
150-
throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " +
150+
throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
151151
"and to_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
152152
}
153153
} else {
154154
final long expectedSeqNo = lastSeenSeqNo + 1;
155155
if (op.seqNo() != expectedSeqNo) {
156-
throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " +
156+
throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
157157
"and to_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
158158
}
159159
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.engine;
21+
22+
/**
23+
* Exception indicating that not all requested operations from {@link LuceneChangesSnapshot}
24+
* are available.
25+
*/
26+
public final class MissingHistoryOperationsException extends IllegalStateException {
27+
28+
MissingHistoryOperationsException(String message) {
29+
super(message);
30+
}
31+
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
118118
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
119119
public static final String CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY = "remote_cluster_name";
120120

121+
public static final String REQUESTED_OPS_MISSING_METADATA_KEY = "es.requested_operations_missing";
122+
121123
private final boolean enabled;
122124
private final Settings settings;
123125
private final CcrLicenseChecker ccrLicenseChecker;

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66
package org.elasticsearch.xpack.ccr.action;
77

88
import org.apache.logging.log4j.message.ParameterizedMessage;
9-
import org.elasticsearch.ElasticsearchException;
10-
import org.elasticsearch.ExceptionsHelper;
9+
import org.elasticsearch.ResourceNotFoundException;
1110
import org.elasticsearch.action.Action;
1211
import org.elasticsearch.action.ActionListener;
1312
import org.elasticsearch.action.ActionRequestValidationException;
@@ -30,16 +29,17 @@
3029
import org.elasticsearch.common.unit.TimeValue;
3130
import org.elasticsearch.index.IndexService;
3231
import org.elasticsearch.index.IndexSettings;
32+
import org.elasticsearch.index.engine.MissingHistoryOperationsException;
3333
import org.elasticsearch.index.seqno.SeqNoStats;
3434
import org.elasticsearch.index.shard.IndexShard;
3535
import org.elasticsearch.index.shard.IndexShardNotStartedException;
3636
import org.elasticsearch.index.shard.IndexShardState;
3737
import org.elasticsearch.index.shard.ShardId;
3838
import org.elasticsearch.index.translog.Translog;
3939
import org.elasticsearch.indices.IndicesService;
40-
import org.elasticsearch.tasks.Task;
4140
import org.elasticsearch.threadpool.ThreadPool;
4241
import org.elasticsearch.transport.TransportService;
42+
import org.elasticsearch.xpack.ccr.Ccr;
4343

4444
import java.io.IOException;
4545
import java.util.ArrayList;
@@ -408,21 +408,6 @@ protected void asyncShardOperation(
408408
}
409409
}
410410

411-
@Override
412-
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
413-
ActionListener<Response> wrappedListener = ActionListener.wrap(listener::onResponse, e -> {
414-
Throwable cause = ExceptionsHelper.unwrapCause(e);
415-
if (cause instanceof IllegalStateException && cause.getMessage().contains("Not all operations between from_seqno [")) {
416-
String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" +
417-
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?";
418-
listener.onFailure(new ElasticsearchException(message, e));
419-
} else {
420-
listener.onFailure(e);
421-
}
422-
});
423-
super.doExecute(task, request, wrappedListener);
424-
}
425-
426411
private void globalCheckpointAdvanced(
427412
final ShardId shardId,
428413
final long globalCheckpoint,
@@ -541,6 +526,14 @@ static Translog.Operation[] getOperations(
541526
break;
542527
}
543528
}
529+
} catch (MissingHistoryOperationsException e) {
530+
String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" +
531+
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?";
532+
// Make it easy to detect this error in ShardFollowNodeTask:
533+
// (adding a metadata header instead of introducing a new exception that extends ElasticsearchException)
534+
ResourceNotFoundException wrapper = new ResourceNotFoundException(message, e);
535+
wrapper.addMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY, Long.toString(fromSeqNo), Long.toString(toSeqNo));
536+
throw wrapper;
544537
}
545538
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
546539
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.ElasticsearchException;
1414
import org.elasticsearch.ElasticsearchSecurityException;
1515
import org.elasticsearch.ExceptionsHelper;
16+
import org.elasticsearch.ResourceNotFoundException;
1617
import org.elasticsearch.action.NoShardAvailableActionException;
1718
import org.elasticsearch.action.UnavailableShardsException;
1819
import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -30,6 +31,7 @@
3031
import org.elasticsearch.persistent.AllocatedPersistentTask;
3132
import org.elasticsearch.tasks.TaskId;
3233
import org.elasticsearch.transport.ConnectTransportException;
34+
import org.elasticsearch.xpack.ccr.Ccr;
3335
import org.elasticsearch.transport.NoSuchRemoteClusterException;
3436
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
3537
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@@ -275,6 +277,14 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
275277
failedReadRequests++;
276278
fetchExceptions.put(from, Tuple.tuple(retryCounter, ExceptionsHelper.convertToElastic(e)));
277279
}
280+
Throwable cause = ExceptionsHelper.unwrapCause(e);
281+
if (cause instanceof ResourceNotFoundException) {
282+
ResourceNotFoundException resourceNotFoundException = (ResourceNotFoundException) cause;
283+
if (resourceNotFoundException.getMetadataKeys().contains(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY)) {
284+
handleFallenBehindLeaderShard(e, from, maxOperationCount, maxRequiredSeqNo, retryCounter);
285+
return;
286+
}
287+
}
278288
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
279289
});
280290
}
@@ -291,6 +301,18 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res
291301
maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);
292302
}
293303

304+
void handleFallenBehindLeaderShard(Exception e, long from, int maxOperationCount, long maxRequiredSeqNo, AtomicInteger retryCounter) {
305+
// Do restore from repository here and after that
306+
// start() should be invoked and stats should be reset
307+
308+
// For now handle like any other failure:
309+
// need a more robust approach to avoid the scenario where an outstanding request
310+
// can trigger another restore while the shard was restored already.
311+
// https://github.com/elastic/elasticsearch/pull/37562#discussion_r250009367
312+
313+
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
314+
}
315+
294316
/** Called when some operations are fetched from the leading */
295317
protected void onOperationsFetched(Translog.Operation[] operations) {
296318

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
package org.elasticsearch.xpack.ccr.action;
77

88
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.ResourceNotFoundException;
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.LatchedActionListener;
912
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
1013
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
1114
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
@@ -16,12 +19,17 @@
1619
import org.elasticsearch.index.translog.Translog;
1720
import org.elasticsearch.plugins.Plugin;
1821
import org.elasticsearch.test.ESSingleNodeTestCase;
22+
import org.elasticsearch.xpack.ccr.Ccr;
1923
import org.elasticsearch.xpack.ccr.LocalStateCcr;
2024

2125
import java.util.Collection;
2226
import java.util.Collections;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicReference;
2329

30+
import static org.hamcrest.Matchers.contains;
2431
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.notNullValue;
2533

2634
public class ShardChangesTests extends ESSingleNodeTestCase {
2735

@@ -88,7 +96,7 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
8896
assertThat(operation.id(), equalTo("5"));
8997
}
9098

91-
public void testMissingOperations() {
99+
public void testMissingOperations() throws Exception {
92100
client().admin().indices().prepareCreate("index")
93101
.setSettings(Settings.builder()
94102
.put("index.soft_deletes.enabled", true)
@@ -113,9 +121,34 @@ public void testMissingOperations() {
113121
request.setFromSeqNo(0L);
114122
request.setMaxOperationCount(1);
115123

116-
Exception e = expectThrows(ElasticsearchException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet());
117-
assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " +
118-
"[index.soft_deletes.retention.operations]?"));
124+
{
125+
ResourceNotFoundException e =
126+
expectThrows(ResourceNotFoundException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet());
127+
assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " +
128+
"[index.soft_deletes.retention.operations]?"));
129+
130+
assertThat(e.getMetadataKeys().size(), equalTo(1));
131+
assertThat(e.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), notNullValue());
132+
assertThat(e.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), contains("0", "0"));
133+
}
134+
{
135+
AtomicReference<Exception> holder = new AtomicReference<>();
136+
CountDownLatch latch = new CountDownLatch(1);
137+
client().execute(ShardChangesAction.INSTANCE, request,
138+
new LatchedActionListener<>(ActionListener.wrap(r -> fail("expected an exception"), holder::set), latch));
139+
latch.await();
140+
141+
ElasticsearchException e = (ElasticsearchException) holder.get();
142+
assertThat(e, notNullValue());
143+
assertThat(e.getMetadataKeys().size(), equalTo(0));
144+
145+
ResourceNotFoundException cause = (ResourceNotFoundException) e.getCause();
146+
assertThat(cause.getMessage(), equalTo("Operations are no longer available for replicating. " +
147+
"Maybe increase the retention setting [index.soft_deletes.retention.operations]?"));
148+
assertThat(cause.getMetadataKeys().size(), equalTo(1));
149+
assertThat(cause.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), notNullValue());
150+
assertThat(cause.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), contains("0", "0"));
151+
}
119152
}
120153

121154
}

0 commit comments

Comments
 (0)