Skip to content

Commit a45a3cc

Browse files
tlrxmartijnvg
andauthored
Specialize pre-closing checks for engine implementations (#38702) (#38723)
The Close Index API has been refactored in 6.7.0 and it now performs pre-closing sanity checks on shards before an index is closed: the maximum sequence number must be equals to the global checkpoint. While this is a strong requirement for regular shards, we identified the need to relax this check in the case of CCR following shards. The following shards are not in charge of managing the max sequence number or global checkpoint, which are pulled from a leader shard. They also fetch and process batches of operations from the leader in an unordered way, potentially leaving gaps in the history of ops. If the following shard lags a lot it's possible that the global checkpoint and max seq number never get in sync, preventing the following shard to be closed and a new PUT Follow action to be issued on this shard (which is our recommended way to resume/restart a CCR following). This commit allows each Engine implementation to define the specific verification it must perform before closing the index. In order to allow following/frozen/closed shards to be closed whatever the max seq number or global checkpoint are, the FollowingEngine and ReadOnlyEngine do not perform any check before the index is closed. Co-authored-by: Martijn van Groningen <[email protected]>
1 parent 84353a6 commit a45a3cc

File tree

9 files changed

+183
-21
lines changed

9 files changed

+183
-21
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,7 @@ private void executeShardOperation(final ShardRequest request, final IndexShard
108108
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
109109
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing");
110110
}
111-
112-
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
113-
if (indexShard.getGlobalCheckpoint() != maxSeqNo) {
114-
throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint()
115-
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
116-
}
117-
111+
indexShard.verifyShardBeforeIndexClosing();
118112
indexShard.flush(new FlushRequest().force(true));
119113
logger.trace("{} shard is ready for closing", shardId);
120114
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,20 @@ protected final DocsStats docsStats(IndexReader indexReader) {
265265
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
266266
}
267267

268+
/**
269+
* Performs the pre-closing checks on the {@link Engine}.
270+
*
271+
* @throws IllegalStateException if the sanity checks failed
272+
*/
273+
public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
274+
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
275+
final long maxSeqNo = getSeqNoStats(globalCheckpoint).getMaxSeqNo();
276+
if (globalCheckpoint != maxSeqNo) {
277+
throw new IllegalStateException("Global checkpoint [" + globalCheckpoint
278+
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
279+
}
280+
}
281+
268282
/**
269283
* A throttling class that can be activated, causing the
270284
* {@code acquireThrottle} method to block on a lock when throttling

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,16 @@ protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final
142142
}
143143
}
144144

145+
@Override
146+
public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
147+
// the value of the global checkpoint is verified when the read-only engine is opened,
148+
// and it is not expected to change during the lifecycle of the engine. We could also
149+
// check this value before closing the read-only engine but if something went wrong
150+
// and the global checkpoint is not in-sync with the max. sequence number anymore,
151+
// checking the value here again would prevent the read-only engine to be closed and
152+
// reopened as an internal engine, which would be the path to fix the issue.
153+
}
154+
145155
protected final DirectoryReader wrapReader(DirectoryReader reader,
146156
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
147157
reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3092,4 +3092,13 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
30923092
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
30933093
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;
30943094
}
3095+
3096+
/**
3097+
* Performs the pre-closing checks on the {@link IndexShard}.
3098+
*
3099+
* @throws IllegalStateException if the sanity checks failed
3100+
*/
3101+
public void verifyShardBeforeIndexClosing() throws IllegalStateException {
3102+
getEngine().verifyEngineBeforeIndexClosing();
3103+
}
30953104
}

server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@
4040
import org.elasticsearch.cluster.service.ClusterService;
4141
import org.elasticsearch.common.settings.Settings;
4242
import org.elasticsearch.index.engine.Engine;
43-
import org.elasticsearch.index.seqno.SeqNoStats;
44-
import org.elasticsearch.index.seqno.SequenceNumbers;
4543
import org.elasticsearch.index.shard.IndexShard;
4644
import org.elasticsearch.index.shard.ReplicationGroup;
4745
import org.elasticsearch.index.shard.ShardId;
@@ -73,6 +71,7 @@
7371
import static org.hamcrest.Matchers.instanceOf;
7472
import static org.hamcrest.Matchers.is;
7573
import static org.mockito.Matchers.any;
74+
import static org.mockito.Mockito.doThrow;
7675
import static org.mockito.Mockito.mock;
7776
import static org.mockito.Mockito.times;
7877
import static org.mockito.Mockito.verify;
@@ -100,8 +99,6 @@ public void setUp() throws Exception {
10099

101100
indexShard = mock(IndexShard.class);
102101
when(indexShard.getActiveOperationsCount()).thenReturn(0);
103-
when(indexShard.getGlobalCheckpoint()).thenReturn(0L);
104-
when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(0L, 0L, 0L));
105102

106103
final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3));
107104
when(indexShard.shardId()).thenReturn(shardId);
@@ -174,17 +171,16 @@ public void testOperationFailsWithNoBlock() {
174171
verify(indexShard, times(0)).flush(any(FlushRequest.class));
175172
}
176173

177-
public void testOperationFailsWithGlobalCheckpointNotCaughtUp() {
178-
final long maxSeqNo = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, Long.MAX_VALUE);
179-
final long localCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, maxSeqNo);
180-
final long globalCheckpoint = randomValueOtherThan(maxSeqNo,
181-
() -> randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, localCheckpoint));
182-
when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint));
183-
when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint);
174+
public void testVerifyShardBeforeIndexClosing() throws Exception {
175+
executeOnPrimaryOrReplica();
176+
verify(indexShard, times(1)).verifyShardBeforeIndexClosing();
177+
verify(indexShard, times(1)).flush(any(FlushRequest.class));
178+
}
184179

185-
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
186-
assertThat(exception.getMessage(), equalTo("Global checkpoint [" + globalCheckpoint + "] mismatches maximum sequence number ["
187-
+ maxSeqNo + "] on index shard " + indexShard.shardId()));
180+
public void testVerifyShardBeforeIndexClosingFailed() {
181+
doThrow(new IllegalStateException("test")).when(indexShard).verifyShardBeforeIndexClosing();
182+
expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
183+
verify(indexShard, times(1)).verifyShardBeforeIndexClosing();
188184
verify(indexShard, times(0)).flush(any(FlushRequest.class));
189185
}
190186

server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,4 +189,25 @@ public void testReadOnly() throws IOException {
189189
}
190190
}
191191
}
192+
193+
/**
194+
* Test that {@link ReadOnlyEngine#verifyEngineBeforeIndexClosing()} never fails
195+
* whatever the value of the global checkpoint to check is.
196+
*/
197+
public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
198+
IOUtils.close(engine, store);
199+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
200+
try (Store store = createStore()) {
201+
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
202+
store.createEmpty(Version.CURRENT.luceneVersion);
203+
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
204+
globalCheckpoint.set(randomNonNegativeLong());
205+
try {
206+
readOnlyEngine.verifyEngineBeforeIndexClosing();
207+
} catch (final IllegalStateException e) {
208+
fail("Read-only engine pre-closing verifications failed");
209+
}
210+
}
211+
}
212+
}
192213
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,4 +195,12 @@ private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException {
195195
public long getNumberOfOptimizedIndexing() {
196196
return numOfOptimizedIndexing.count();
197197
}
198+
199+
@Override
200+
public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
201+
// the value of the global checkpoint is not verified when the following engine is closed,
202+
// allowing it to be closed even in the case where all operations have not been fetched and
203+
// processed from the leader and the operations history has gaps. This way the following
204+
// engine can be closed and reopened in order to bootstrap the follower index again.
205+
}
198206
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ccr;
7+
8+
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
9+
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
10+
import org.elasticsearch.action.search.SearchRequest;
11+
import org.elasticsearch.action.support.ActiveShardCount;
12+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.block.ClusterBlock;
15+
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
16+
import org.elasticsearch.common.unit.ByteSizeValue;
17+
import org.elasticsearch.common.unit.TimeValue;
18+
import org.elasticsearch.common.xcontent.XContentType;
19+
import org.elasticsearch.index.IndexSettings;
20+
import org.elasticsearch.xpack.CcrIntegTestCase;
21+
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
27+
import static java.util.Collections.singletonMap;
28+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
29+
import static org.hamcrest.Matchers.equalTo;
30+
import static org.hamcrest.Matchers.is;
31+
32+
public class CloseFollowerIndexIT extends CcrIntegTestCase {
33+
34+
public void testCloseAndReopenFollowerIndex() throws Exception {
35+
final String leaderIndexSettings = getIndexSettings(1, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
36+
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
37+
ensureLeaderYellow("index1");
38+
39+
PutFollowAction.Request followRequest = new PutFollowAction.Request();
40+
followRequest.setRemoteCluster("leader_cluster");
41+
followRequest.setLeaderIndex("index1");
42+
followRequest.setFollowerIndex("index2");
43+
followRequest.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10));
44+
followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10));
45+
followRequest.getParameters().setMaxReadRequestSize(new ByteSizeValue(1));
46+
followRequest.getParameters().setMaxOutstandingReadRequests(128);
47+
followRequest.waitForActiveShards(ActiveShardCount.DEFAULT);
48+
49+
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
50+
ensureFollowerGreen("index2");
51+
52+
AtomicBoolean isRunning = new AtomicBoolean(true);
53+
int numThreads = 4;
54+
Thread[] threads = new Thread[numThreads];
55+
for (int i = 0; i < numThreads; i++) {
56+
threads[i] = new Thread(() -> {
57+
while (isRunning.get()) {
58+
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
59+
}
60+
});
61+
threads[i].start();
62+
}
63+
64+
atLeastDocsIndexed(followerClient(), "index2", 32);
65+
AcknowledgedResponse response = followerClient().admin().indices().close(new CloseIndexRequest("index2")).get();
66+
assertThat(response.isAcknowledged(), is(true));
67+
68+
ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
69+
List<ClusterBlock> blocks = new ArrayList<>(clusterState.getBlocks().indices().get("index2"));
70+
assertThat(blocks.size(), equalTo(1));
71+
assertThat(blocks.get(0).id(), equalTo(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID));
72+
73+
isRunning.set(false);
74+
for (Thread thread : threads) {
75+
thread.join();
76+
}
77+
assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2")).get());
78+
79+
refresh(leaderClient(), "index1");
80+
SearchRequest leaderSearchRequest = new SearchRequest("index1");
81+
leaderSearchRequest.source().trackTotalHits(true);
82+
long leaderIndexDocs = leaderClient().search(leaderSearchRequest).actionGet().getHits().getTotalHits().value;
83+
assertBusy(() -> {
84+
refresh(followerClient(), "index2");
85+
SearchRequest followerSearchRequest = new SearchRequest("index2");
86+
followerSearchRequest.source().trackTotalHits(true);
87+
long followerIndexDocs = followerClient().search(followerSearchRequest).actionGet().getHits().getTotalHits().value;
88+
assertThat(followerIndexDocs, equalTo(leaderIndexDocs));
89+
});
90+
}
91+
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,4 +640,23 @@ public void testProcessOnceOnPrimary() throws Exception {
640640
}
641641
}
642642
}
643+
644+
/**
645+
* Test that {@link FollowingEngine#verifyEngineBeforeIndexClosing()} never fails
646+
* whatever the value of the global checkpoint to check is.
647+
*/
648+
public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
649+
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
650+
runIndexTest(
651+
seqNo,
652+
Engine.Operation.Origin.PRIMARY,
653+
(followingEngine, index) -> {
654+
globalCheckpoint.set(randomNonNegativeLong());
655+
try {
656+
followingEngine.verifyEngineBeforeIndexClosing();
657+
} catch (final IllegalStateException e) {
658+
fail("Following engine pre-closing verifications failed");
659+
}
660+
});
661+
}
643662
}

0 commit comments

Comments
 (0)