Skip to content

Commit 8ada485

Browse files
authored
Add test for PutFollowAction on a closed index (#38236) (#38377)
This is related to #35975. Currently when an index falls behind a leader it encounters a fatal exception. This commit adds a test for that scenario. Additionally, it tests that the user can stop following, close the follower index, and put follow again. After the indexing is re-bootstrapped, it will recover the documents it lost in normal following operations.
1 parent f63cbdb commit 8ada485

File tree

1 file changed

+107
-2
lines changed

1 file changed

+107
-2
lines changed

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
import org.elasticsearch.ElasticsearchException;
1010
import org.elasticsearch.ElasticsearchStatusException;
11+
import org.elasticsearch.ExceptionsHelper;
12+
import org.elasticsearch.ResourceAlreadyExistsException;
13+
import org.elasticsearch.ResourceNotFoundException;
1114
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
1215
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
1316
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
@@ -16,10 +19,13 @@
1619
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1720
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
1821
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
22+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
23+
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
1924
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
2025
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
2126
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
2227
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
28+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
2329
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
2430
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
2531
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
@@ -53,6 +59,7 @@
5359
import org.elasticsearch.index.shard.ShardId;
5460
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
5561
import org.elasticsearch.rest.RestStatus;
62+
import org.elasticsearch.snapshots.SnapshotRestoreException;
5663
import org.elasticsearch.tasks.TaskInfo;
5764
import org.elasticsearch.transport.NoSuchRemoteClusterException;
5865
import org.elasticsearch.xpack.CcrIntegTestCase;
@@ -75,6 +82,7 @@
7582
import java.util.Locale;
7683
import java.util.Map;
7784
import java.util.Objects;
85+
import java.util.Set;
7886
import java.util.concurrent.Semaphore;
7987
import java.util.concurrent.TimeUnit;
8088
import java.util.concurrent.atomic.AtomicBoolean;
@@ -86,6 +94,7 @@
8694
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
8795
import static org.hamcrest.Matchers.empty;
8896
import static org.hamcrest.Matchers.equalTo;
97+
import static org.hamcrest.Matchers.greaterThan;
8998
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
9099
import static org.hamcrest.Matchers.hasSize;
91100
import static org.hamcrest.Matchers.is;
@@ -943,6 +952,98 @@ public void testUpdateAnalysisLeaderIndexSettings() throws Exception {
943952
assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(true));
944953
}
945954

955+
public void testMustCloseIndexAndPauseToRestartWithPutFollowing() throws Exception {
956+
final int numberOfPrimaryShards = randomIntBetween(1, 3);
957+
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
958+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
959+
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
960+
ensureLeaderYellow("index1");
961+
962+
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
963+
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
964+
assertTrue(response.isFollowIndexCreated());
965+
assertTrue(response.isFollowIndexShardsAcked());
966+
assertTrue(response.isIndexFollowingStarted());
967+
968+
final PutFollowAction.Request followRequest2 = putFollow("index1", "index2");
969+
expectThrows(SnapshotRestoreException.class,
970+
() -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet());
971+
972+
followerClient().admin().indices().prepareClose("index2").get();
973+
expectThrows(ResourceAlreadyExistsException.class,
974+
() -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet());
975+
}
976+
977+
public void testIndexFallBehind() throws Exception {
978+
final int numberOfPrimaryShards = randomIntBetween(1, 3);
979+
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
980+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
981+
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
982+
ensureLeaderYellow("index1");
983+
984+
final int numDocs = randomIntBetween(2, 64);
985+
logger.info("Indexing [{}] docs as first batch", numDocs);
986+
for (int i = 0; i < numDocs; i++) {
987+
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
988+
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
989+
}
990+
991+
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
992+
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
993+
assertTrue(response.isFollowIndexCreated());
994+
assertTrue(response.isFollowIndexShardsAcked());
995+
assertTrue(response.isIndexFollowingStarted());
996+
997+
assertIndexFullyReplicatedToFollower("index1", "index2");
998+
for (int i = 0; i < numDocs; i++) {
999+
assertBusy(assertExpectedDocumentRunnable(i));
1000+
}
1001+
1002+
pauseFollow("index2");
1003+
1004+
for (int i = 0; i < numDocs; i++) {
1005+
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i * 2);
1006+
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
1007+
}
1008+
leaderClient().prepareDelete("index1", "doc", "1").get();
1009+
leaderClient().admin().indices().refresh(new RefreshRequest("index1")).actionGet();
1010+
leaderClient().admin().indices().flush(new FlushRequest("index1").force(true)).actionGet();
1011+
ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index1");
1012+
forceMergeRequest.maxNumSegments(1);
1013+
leaderClient().admin().indices().forceMerge(forceMergeRequest).actionGet();
1014+
1015+
followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
1016+
1017+
assertBusy(() -> {
1018+
List<ShardFollowNodeTaskStatus> statuses = getFollowTaskStatuses("index2");
1019+
Set<ResourceNotFoundException> exceptions = statuses.stream()
1020+
.map(ShardFollowNodeTaskStatus::getFatalException)
1021+
.filter(Objects::nonNull)
1022+
.map(ExceptionsHelper::unwrapCause)
1023+
.filter(e -> e instanceof ResourceNotFoundException)
1024+
.map(e -> (ResourceNotFoundException) e)
1025+
.filter(e -> e.getMetadataKeys().contains("es.requested_operations_missing"))
1026+
.collect(Collectors.toSet());
1027+
assertThat(exceptions.size(), greaterThan(0));
1028+
});
1029+
1030+
followerClient().admin().indices().prepareClose("index2").get();
1031+
pauseFollow("index2");
1032+
1033+
1034+
final PutFollowAction.Request followRequest2 = putFollow("index1", "index2");
1035+
PutFollowAction.Response response2 = followerClient().execute(PutFollowAction.INSTANCE, followRequest2).get();
1036+
assertTrue(response2.isFollowIndexCreated());
1037+
assertTrue(response2.isFollowIndexShardsAcked());
1038+
assertTrue(response2.isIndexFollowingStarted());
1039+
1040+
ensureFollowerGreen("index2");
1041+
assertIndexFullyReplicatedToFollower("index1", "index2");
1042+
for (int i = 2; i < numDocs; i++) {
1043+
assertBusy(assertExpectedDocumentRunnable(i, i * 2));
1044+
}
1045+
}
1046+
9461047
private long getFollowTaskSettingsVersion(String followerIndex) {
9471048
long settingsVersion = -1L;
9481049
for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {
@@ -1028,9 +1129,13 @@ private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, f
10281129
}
10291130

10301131
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
1132+
return assertExpectedDocumentRunnable(value, value);
1133+
}
1134+
1135+
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int key, final int value) {
10311136
return () -> {
1032-
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get();
1033-
assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists());
1137+
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(key)).get();
1138+
assertTrue("Doc with id [" + key + "] is missing", getResponse.isExists());
10341139
assertTrue((getResponse.getSource().containsKey("f")));
10351140
assertThat(getResponse.getSource().get("f"), equalTo(value));
10361141
};

0 commit comments

Comments
 (0)