Skip to content

Commit 3cda401

Browse files
committed
Keeps index commits up to the global checkpoint
We need to keep index commits and translog operations up to the current global checkpoint for the operation-based recovery. These can be done by introducing a new deletion policy. The new policy keeps the latest (eg. youngest) commit whose local checkpoint is not greater than the current global checkpoint, and also keeps all subsequent commits. Once those commits are kept, a CombinedDeletionPolicy will retain translog operations at least up to the current global checkpoint.
1 parent 5c34533 commit 3cda401

File tree

4 files changed

+303
-12
lines changed

4 files changed

+303
-12
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.lucene.index.IndexReader;
2626
import org.apache.lucene.index.IndexWriter;
2727
import org.apache.lucene.index.IndexWriterConfig;
28-
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
2928
import org.apache.lucene.index.LeafReader;
3029
import org.apache.lucene.index.LiveIndexWriterConfig;
3130
import org.apache.lucene.index.MergePolicy;
@@ -163,12 +162,6 @@ public InternalEngine(EngineConfig engineConfig) {
163162
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
164163
}
165164
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
166-
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
167-
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
168-
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
169-
);
170-
this.deletionPolicy = new CombinedDeletionPolicy(
171-
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
172165
store.incRef();
173166
IndexWriter writer = null;
174167
Translog translog = null;
@@ -183,28 +176,37 @@ public InternalEngine(EngineConfig engineConfig) {
183176
throttle = new IndexThrottle();
184177
try {
185178
final SeqNoStats seqNoStats;
179+
final boolean shouldCreateIndex;
186180
switch (openMode) {
187181
case OPEN_INDEX_AND_TRANSLOG:
188-
writer = createWriter(false);
189182
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
190183
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
184+
shouldCreateIndex = false;
191185
break;
192186
case OPEN_INDEX_CREATE_TRANSLOG:
193-
writer = createWriter(false);
194187
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
188+
shouldCreateIndex = false;
195189
break;
196190
case CREATE_INDEX_AND_TRANSLOG:
197-
writer = createWriter(true);
198191
seqNoStats = new SeqNoStats(
199192
SequenceNumbers.NO_OPS_PERFORMED,
200193
SequenceNumbers.NO_OPS_PERFORMED,
201194
SequenceNumbers.UNASSIGNED_SEQ_NO);
195+
shouldCreateIndex = true;
202196
break;
203197
default:
204198
throw new IllegalArgumentException(openMode.toString());
205199
}
206200
logger.trace("recovered [{}]", seqNoStats);
207201
seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
202+
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
203+
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
204+
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
205+
);
206+
this.deletionPolicy = new CombinedDeletionPolicy(
207+
new SnapshotDeletionPolicy(new KeepUntilGlobalCheckpointDeletionPolicy(seqNoService::getGlobalCheckpoint)),
208+
translogDeletionPolicy, openMode);
209+
writer = createWriter(shouldCreateIndex);
208210
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
209211
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
210212
Objects.requireNonNull(historyUUID, "history uuid should not be null");
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.engine;
21+
22+
import org.apache.lucene.index.IndexCommit;
23+
import org.apache.lucene.index.IndexDeletionPolicy;
24+
import org.elasticsearch.index.seqno.SequenceNumbers;
25+
26+
import java.io.IOException;
27+
import java.util.List;
28+
import java.util.function.LongSupplier;
29+
30+
/**
31+
* An {@link IndexDeletionPolicy} keeps the latest (eg. youngest) commit whose local checkpoint is not
32+
* greater than the current global checkpoint, and also keeps all subsequent commits. Once those
33+
* commits are kept, a {@link CombinedDeletionPolicy} will retain translog operations at least up to
34+
* the current global checkpoint.
35+
*/
36+
public final class KeepUntilGlobalCheckpointDeletionPolicy extends IndexDeletionPolicy {
37+
private final LongSupplier globalCheckpointSupplier;
38+
39+
public KeepUntilGlobalCheckpointDeletionPolicy(LongSupplier globalCheckpointSupplier) {
40+
this.globalCheckpointSupplier = globalCheckpointSupplier;
41+
}
42+
43+
@Override
44+
public void onInit(List<? extends IndexCommit> commits) throws IOException {
45+
onCommit(commits);
46+
}
47+
48+
@Override
49+
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
50+
final long globalCheckpoint = globalCheckpointSupplier.getAsLong();
51+
for (int i = commits.size() - 1; i >= 0; i--) {
52+
if (localCheckpoint(commits.get(i)) <= globalCheckpoint) {
53+
i--; // This is the youngest commit whose local checkpoint <= global checkpoint - reserve it, then delete all previous ones.
54+
for (; i >= 0; i--) {
55+
commits.get(i).delete();
56+
}
57+
break;
58+
}
59+
}
60+
}
61+
62+
private static long localCheckpoint(IndexCommit commit) throws IOException {
63+
return Long.parseLong(commit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
64+
}
65+
}
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.engine;
21+
22+
import org.apache.lucene.document.Field;
23+
import org.apache.lucene.index.DirectoryReader;
24+
import org.apache.lucene.index.IndexCommit;
25+
import org.elasticsearch.common.bytes.BytesReference;
26+
import org.elasticsearch.index.mapper.ParseContext;
27+
import org.elasticsearch.index.mapper.ParsedDocument;
28+
import org.elasticsearch.index.mapper.SourceFieldMapper;
29+
import org.elasticsearch.index.seqno.SequenceNumbers;
30+
import org.elasticsearch.index.seqno.SequenceNumbersService;
31+
import org.elasticsearch.index.store.Store;
32+
import org.elasticsearch.index.translog.Translog;
33+
import org.junit.Before;
34+
35+
import java.io.IOException;
36+
import java.nio.file.Path;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.concurrent.atomic.AtomicLong;
41+
42+
import static org.hamcrest.Matchers.equalTo;
43+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
44+
import static org.hamcrest.Matchers.hasSize;
45+
46+
public class KeepUntilGlobalCheckpointDeletionPolicyTests extends EngineTestCase {
47+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
48+
final AtomicInteger docId = new AtomicInteger();
49+
50+
@Before
51+
public void resetCounters() throws Exception {
52+
globalCheckpoint.set(SequenceNumbers.UNASSIGNED_SEQ_NO);
53+
docId.set(0);
54+
}
55+
56+
public void testUnassignedGlobalCheckpoint() throws IOException {
57+
Path indexPath = createTempDir();
58+
globalCheckpoint.set(SequenceNumbers.UNASSIGNED_SEQ_NO);
59+
try (Store store = createStore()) {
60+
int initDocs = scaledRandomIntBetween(10, 1000);
61+
int initCommits = 1;
62+
try (InternalEngine engine = newEngine(store, indexPath)) {
63+
for (int i = 0; i < initDocs; i++) {
64+
addDoc(engine);
65+
if (frequently()) {
66+
initCommits++;
67+
engine.flush(true, true);
68+
}
69+
if (rarely()) {
70+
engine.rollTranslogGeneration();
71+
}
72+
}
73+
engine.flush(true, true);
74+
}
75+
assertThat(DirectoryReader.listCommits(store.directory()), hasSize(initCommits + 1));
76+
try (InternalEngine engine = newEngine(store, indexPath)) {
77+
engine.refresh("test");
78+
assertThat("Unassigned global checkpoint reserves all commits", DirectoryReader.listCommits(store.directory()),
79+
hasSize(initCommits + 1));
80+
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
81+
assertThat("Unassigned global checkpoint reserves all translog", snapshot.totalOperations(), equalTo(initDocs));
82+
}
83+
int moreDocs = scaledRandomIntBetween(1, 100);
84+
int extraCommits = 0;
85+
for (int i = 0; i < moreDocs; i++) {
86+
addDoc(engine);
87+
if (frequently()) {
88+
engine.flush(true, true);
89+
extraCommits++;
90+
}
91+
}
92+
assertThat("Unassigned global checkpoint reserves all commits", DirectoryReader.listCommits(store.directory()),
93+
hasSize(initCommits + 1 + extraCommits));
94+
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
95+
assertThat("Unassigned global checkpoint reserves all translog", snapshot.totalOperations(),
96+
equalTo(initDocs + moreDocs));
97+
}
98+
}
99+
}
100+
}
101+
102+
public void testKeepUpGlobalCheckpoint() throws Exception {
103+
Path indexPath = createTempDir();
104+
try (Store store = createStore()) {
105+
int initDocs = scaledRandomIntBetween(10, 1000);
106+
try (InternalEngine engine = newEngine(store, indexPath)) {
107+
for (int i = 0; i < initDocs; i++) {
108+
addDoc(engine);
109+
globalCheckpoint.set(engine.seqNoService().getLocalCheckpoint());
110+
if (frequently()) {
111+
engine.flush(true, true);
112+
}
113+
}
114+
engine.flush(true, true);
115+
}
116+
assertThat(DirectoryReader.listCommits(store.directory()), hasSize(1));
117+
try (InternalEngine engine = newEngine(store, indexPath)) {
118+
assertThat("OnInit deletes unreferenced commits", DirectoryReader.listCommits(store.directory()), hasSize(1));
119+
int moreDocs = scaledRandomIntBetween(1, 100);
120+
for (int i = 0; i < moreDocs; i++) {
121+
addDoc(engine);
122+
globalCheckpoint.set(engine.seqNoService().getLocalCheckpoint());
123+
if (frequently()) {
124+
engine.flush(true, true);
125+
assertThat("OnCommit deletes unreferenced commits", DirectoryReader.listCommits(store.directory()), hasSize(1));
126+
}
127+
}
128+
}
129+
}
130+
}
131+
132+
public void testLaggingGlobalCheckpoint() throws Exception {
133+
Path indexPath = createTempDir();
134+
try (Store store = createStore()) {
135+
int initDocs = scaledRandomIntBetween(100, 1000);
136+
try (InternalEngine engine = newEngine(store, indexPath)) {
137+
for (int i = 0; i < initDocs; i++) {
138+
addDoc(engine);
139+
if (frequently()) {
140+
globalCheckpoint.set(engine.seqNoService().getLocalCheckpoint());
141+
}
142+
if (frequently()) {
143+
engine.flush(true, true);
144+
}
145+
if (rarely()) {
146+
engine.rollTranslogGeneration();
147+
}
148+
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
149+
assertThat((long) snapshot.totalOperations(), greaterThanOrEqualTo(requiredOperations(i + 1)));
150+
}
151+
}
152+
engine.flush(true, true);
153+
}
154+
assertThat("Reserved commits should be 1", reservedCommits(), hasSize(1));
155+
156+
try (InternalEngine engine = newEngine(store, indexPath)) {
157+
assertThat("Reserved commits should always be 1", reservedCommits(), hasSize(1));
158+
int moreDocs = scaledRandomIntBetween(1, 100);
159+
for (int i = 0; i < moreDocs; i++) {
160+
addDoc(engine);
161+
if (frequently()) {
162+
globalCheckpoint.set(engine.seqNoService().getLocalCheckpoint());
163+
}
164+
if (frequently()) {
165+
engine.flush(true, true);
166+
assertThat("Reserved commits should be 1", reservedCommits(), hasSize(1));
167+
}
168+
if (rarely()) {
169+
engine.rollTranslogGeneration();
170+
}
171+
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
172+
assertThat((long) snapshot.totalOperations(), greaterThanOrEqualTo(requiredOperations(initDocs + i + 1)));
173+
}
174+
}
175+
}
176+
}
177+
}
178+
179+
long requiredOperations(int processedOps) {
180+
return processedOps - Math.max(0, globalCheckpoint.get());
181+
}
182+
183+
List<IndexCommit> reservedCommits() throws IOException {
184+
List<IndexCommit> reservedCommits = new ArrayList<>();
185+
List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
186+
for (IndexCommit commit : existingCommits) {
187+
if (Long.parseLong(commit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) <= globalCheckpoint.get()) {
188+
reservedCommits.add(commit);
189+
}
190+
}
191+
return reservedCommits;
192+
}
193+
194+
void addDoc(Engine engine) throws IOException {
195+
ParseContext.Document document = testDocumentWithTextField();
196+
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
197+
ParsedDocument doc = testParsedDocument(Integer.toString(docId.getAndIncrement()), null, document, B_1, null);
198+
engine.index(indexForDoc(doc));
199+
}
200+
201+
InternalEngine newEngine(Store store, Path indexPath) throws IOException {
202+
return createEngine(defaultSettings, store, indexPath, newMergePolicy(), null,
203+
(config, seqNoStats) -> new SequenceNumbersService(
204+
config.getShardId(),
205+
config.getAllocationId(),
206+
config.getIndexSettings(),
207+
seqNoStats.getMaxSeqNo(),
208+
seqNoStats.getLocalCheckpoint(),
209+
seqNoStats.getGlobalCheckpoint()) {
210+
@Override
211+
public long getGlobalCheckpoint() {
212+
return globalCheckpoint.get();
213+
}
214+
}
215+
);
216+
}
217+
}

core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,8 +1042,9 @@ public void onFailure(Exception e) {
10421042
closeShards(indexShard);
10431043
}
10441044

1045-
public void testAcquireIndexCommit() throws IOException {
1046-
final IndexShard shard = newStartedShard();
1045+
public void testAcquireIndexCommit() throws Exception {
1046+
boolean isPrimary = randomBoolean();
1047+
final IndexShard shard = newStartedShard(isPrimary);
10471048
int numDocs = randomInt(20);
10481049
for (int i = 0; i < numDocs; i++) {
10491050
indexDoc(shard, "type", "id_" + i);
@@ -1060,6 +1061,12 @@ public void testAcquireIndexCommit() throws IOException {
10601061
assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0));
10611062
}
10621063
commit.close();
1064+
// Make the global checkpoint in sync with the local checkpoint.
1065+
if (isPrimary) {
1066+
shard.getEngine().seqNoService().markAllocationIdAsInSync(shard.shardRouting.allocationId().getId(), numDocs + moreDocs - 1);
1067+
} else {
1068+
shard.updateGlobalCheckpointOnReplica(shard.getLocalCheckpoint(), "test");
1069+
}
10631070
flushShard(shard, true);
10641071

10651072
// check it's clean up

0 commit comments

Comments
 (0)