Skip to content

Commit 6c123b3

Browse files
committed
Prevent CCR recovery from missing documents (elastic#38237)
Currently the snapshot/restore process manually sets the global checkpoint to the max sequence number from the restored segements. This does not work for Ccr as this will lead to documents that would be recovered in the normal followering operation from being recovered. This commit fixes this issue by setting the initial global checkpoint to the existing local checkpoint.
1 parent 39c7acb commit 6c123b3

File tree

10 files changed

+398
-107
lines changed

10 files changed

+398
-107
lines changed

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -399,9 +399,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
399399
assert indexShouldExists;
400400
store.bootstrapNewHistory();
401401
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
402-
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
402+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
403403
final String translogUUID = Translog.createEmptyTranslog(
404-
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
404+
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
405405
store.associateIndexWithNewTranslog(translogUUID);
406406
} else if (indexShouldExists) {
407407
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
@@ -473,9 +473,9 @@ private void restore(final IndexShard indexShard, final Repository repository, f
473473
final Store store = indexShard.store();
474474
store.bootstrapNewHistory();
475475
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
476-
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
476+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
477477
final String translogUUID = Translog.createEmptyTranslog(
478-
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
478+
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
479479
store.associateIndexWithNewTranslog(translogUUID);
480480
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
481481
indexShard.openEngineAndRecoverFromTranslog();

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1427,29 +1427,28 @@ public void bootstrapNewHistory() throws IOException {
14271427
metadataLock.writeLock().lock();
14281428
try {
14291429
Map<String, String> userData = readLastCommittedSegmentsInfo().getUserData();
1430-
final SequenceNumbers.CommitInfo seqno = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
1431-
bootstrapNewHistory(seqno.maxSeqNo);
1430+
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
1431+
final long localCheckpoint = Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
1432+
bootstrapNewHistory(localCheckpoint, maxSeqNo);
14321433
} finally {
14331434
metadataLock.writeLock().unlock();
14341435
}
14351436
}
14361437

14371438
/**
1438-
* Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint
1439+
* Marks an existing lucene index with a new history uuid and sets the given local checkpoint
14391440
* as well as the maximum sequence number.
1440-
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
1441+
* This is used to make sure no existing shard will recover from this index using ops based recovery.
14411442
* @see SequenceNumbers#LOCAL_CHECKPOINT_KEY
14421443
* @see SequenceNumbers#MAX_SEQ_NO
14431444
*/
1444-
public void bootstrapNewHistory(long maxSeqNo) throws IOException {
1445+
public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException {
14451446
metadataLock.writeLock().lock();
14461447
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
1447-
final Map<String, String> userData = getUserData(writer);
14481448
final Map<String, String> map = new HashMap<>();
14491449
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
1450+
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
14501451
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
1451-
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
1452-
logger.debug("bootstrap a new history_uuid [{}], user_data [{}]", map, userData);
14531452
updateCommitData(writer, map);
14541453
} finally {
14551454
metadataLock.writeLock().unlock();

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2157,9 +2157,12 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc
21572157

21582158
public void testRestoreShard() throws IOException {
21592159
final IndexShard source = newStartedShard(true);
2160-
IndexShard target = newStartedShard(true);
2160+
IndexShard target = newStartedShard(true, Settings.builder()
2161+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), source.indexSettings().isSoftDeleteEnabled()).build());
21612162

21622163
indexDoc(source, "_doc", "0");
2164+
EngineTestCase.generateNewSeqNo(source.getEngine()); // create a gap in the history
2165+
indexDoc(source, "_doc", "2");
21632166
if (randomBoolean()) {
21642167
source.refresh("test");
21652168
}
@@ -2195,16 +2198,18 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio
21952198
}
21962199
}
21972200
}));
2198-
assertThat(target.getLocalCheckpoint(), equalTo(0L));
2199-
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L));
2200-
assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L));
2201+
assertThat(target.getLocalCheckpoint(), equalTo(2L));
2202+
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
2203+
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
22012204
IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
22022205
assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard(
2203-
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L));
2206+
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L));
2207+
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L));
22042208

2205-
assertDocs(target, "0");
2209+
assertDocs(target, "0", "2");
22062210

2207-
closeShards(source, target);
2211+
closeShard(source, false);
2212+
closeShards(target);
22082213
}
22092214

22102215
public void testSearcherWrapperIsUsed() throws IOException {
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.index.shard;
20+
21+
import org.apache.lucene.index.IndexCommit;
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.cluster.metadata.IndexMetaData;
24+
import org.elasticsearch.cluster.metadata.MetaData;
25+
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
26+
import org.elasticsearch.cluster.node.DiscoveryNode;
27+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
28+
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
29+
import org.elasticsearch.index.store.Store;
30+
import org.elasticsearch.repositories.IndexId;
31+
import org.elasticsearch.repositories.Repository;
32+
import org.elasticsearch.repositories.RepositoryData;
33+
import org.elasticsearch.snapshots.SnapshotId;
34+
import org.elasticsearch.snapshots.SnapshotInfo;
35+
import org.elasticsearch.snapshots.SnapshotShardFailure;
36+
37+
import java.io.IOException;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Set;
43+
44+
import static java.util.Collections.emptySet;
45+
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
46+
47+
/** A dummy repository for testing which just needs restore overridden */
48+
public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
49+
private final String indexName;
50+
51+
public RestoreOnlyRepository(String indexName) {
52+
this.indexName = indexName;
53+
}
54+
55+
@Override
56+
protected void doStart() {
57+
}
58+
59+
@Override
60+
protected void doStop() {
61+
}
62+
63+
@Override
64+
protected void doClose() {
65+
}
66+
67+
@Override
68+
public RepositoryMetaData getMetadata() {
69+
return null;
70+
}
71+
72+
@Override
73+
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
74+
return null;
75+
}
76+
77+
@Override
78+
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
79+
return null;
80+
}
81+
82+
@Override
83+
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
84+
return null;
85+
}
86+
87+
@Override
88+
public RepositoryData getRepositoryData() {
89+
Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
90+
map.put(new IndexId(indexName, "blah"), emptySet());
91+
return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList());
92+
}
93+
94+
@Override
95+
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
96+
}
97+
98+
@Override
99+
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
100+
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
101+
boolean includeGlobalState) {
102+
return null;
103+
}
104+
105+
@Override
106+
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
107+
}
108+
109+
@Override
110+
public long getSnapshotThrottleTimeInNanos() {
111+
return 0;
112+
}
113+
114+
@Override
115+
public long getRestoreThrottleTimeInNanos() {
116+
return 0;
117+
}
118+
119+
@Override
120+
public String startVerification() {
121+
return null;
122+
}
123+
124+
@Override
125+
public void endVerification(String verificationToken) {
126+
}
127+
128+
@Override
129+
public boolean isReadOnly() {
130+
return false;
131+
}
132+
133+
@Override
134+
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
135+
IndexShardSnapshotStatus snapshotStatus) {
136+
}
137+
138+
@Override
139+
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
140+
return null;
141+
}
142+
143+
@Override
144+
public void verify(String verificationToken, DiscoveryNode localNode) {
145+
}
146+
}

test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class BackgroundIndexer implements AutoCloseable {
5252
private final Logger logger = LogManager.getLogger(getClass());
5353

5454
final Thread[] writers;
55+
final Client client;
5556
final CountDownLatch stopLatch;
5657
final CopyOnWriteArrayList<Exception> failures;
5758
final AtomicBoolean stop = new AtomicBoolean(false);
@@ -122,6 +123,7 @@ public BackgroundIndexer(final String index, final String type, final Client cli
122123
if (random == null) {
123124
random = RandomizedTest.getRandom();
124125
}
126+
this.client = client;
125127
useAutoGeneratedIDs = random.nextBoolean();
126128
failures = new CopyOnWriteArrayList<>();
127129
writers = new Thread[writerCount];
@@ -316,6 +318,10 @@ public void close() throws Exception {
316318
stop();
317319
}
318320

321+
public Client getClient() {
322+
return client;
323+
}
324+
319325
/**
320326
* Returns the ID set of all documents indexed by this indexer run
321327
*/

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -240,21 +240,17 @@ private void initiateFollowing(
240240
final PutFollowAction.Request request,
241241
final ActionListener<PutFollowAction.Response> listener) {
242242
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT.";
243-
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
244-
request.waitForActiveShards(), request.timeout(), result -> {
245-
if (result) {
246-
FollowParameters parameters = request.getParameters();
247-
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
248-
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
249-
resumeFollowRequest.setParameters(new FollowParameters(parameters));
250-
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
251-
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
252-
listener::onFailure
253-
));
254-
} else {
255-
listener.onResponse(new PutFollowAction.Response(true, false, false));
256-
}
257-
}, listener::onFailure);
243+
FollowParameters parameters = request.getParameters();
244+
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
245+
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
246+
resumeFollowRequest.setParameters(new FollowParameters(parameters));
247+
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
248+
r -> activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
249+
request.waitForActiveShards(), request.timeout(), result ->
250+
listener.onResponse(new PutFollowAction.Response(true, result, r.isAcknowledged())),
251+
listener::onFailure),
252+
listener::onFailure
253+
));
258254
}
259255

260256
@Override

0 commit comments

Comments
 (0)