Skip to content

Commit bbdaf11

Browse files
authored
Remove gap skipping when opening engine
Today when opening the engine we skip gaps in the history, advancing the local checkpoint until it is equal to the maximum sequence number contained in the commit. This allows history to advance, but it leaves gaps. A previous change filled these gaps when recovering from store, but since we were skipping the gaps while opening the engine, this change had no effect. This commit removes the gap skipping when opening the engine allowing the gap filling to do its job. Relates #24535
1 parent 59dd4d2 commit bbdaf11

File tree

4 files changed

+49
-42
lines changed

4 files changed

+49
-42
lines changed

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1421,7 +1421,7 @@ public interface Warmer {
14211421
* @param primaryTerm the shards primary term this engine was created for
14221422
* @return the number of no-ops added
14231423
*/
1424-
public abstract int fillSequenceNumberHistory(long primaryTerm) throws IOException;
1424+
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;
14251425

14261426
/**
14271427
* Performs recovery from the transaction log.

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -177,15 +177,6 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
177177
logger.trace("recovered [{}]", seqNoStats);
178178
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
179179
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
180-
// norelease
181-
/*
182-
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
183-
* that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local
184-
* checkpoint to the maximum sequence number in the commit (at the potential expense of correctness).
185-
*/
186-
while (seqNoService().getLocalCheckpoint() < seqNoService().getMaxSeqNo()) {
187-
seqNoService().markSeqNoAsCompleted(seqNoService().getLocalCheckpoint() + 1);
188-
}
189180
indexWriter = writer;
190181
translog = openTranslog(engineConfig, writer, () -> seqNoService().getGlobalCheckpoint());
191182
assert translog.getGeneration() != null;
@@ -226,21 +217,20 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
226217
}
227218

228219
@Override
229-
public int fillSequenceNumberHistory(long primaryTerm) throws IOException {
230-
try (ReleasableLock lock = writeLock.acquire()) {
220+
public int fillSeqNoGaps(long primaryTerm) throws IOException {
221+
try (ReleasableLock ignored = writeLock.acquire()) {
231222
ensureOpen();
232-
final long localCheckpoint = seqNoService.getLocalCheckpoint();
233-
final long maxSeqId = seqNoService.getMaxSeqNo();
223+
final long localCheckpoint = seqNoService().getLocalCheckpoint();
224+
final long maxSeqNo = seqNoService().getMaxSeqNo();
234225
int numNoOpsAdded = 0;
235-
for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqId;
236-
// the local checkpoint might have been advanced so we are leap-frogging
237-
// to the next seq ID we need to process and create a noop for
238-
seqNo = seqNoService.getLocalCheckpoint()+1) {
239-
final NoOp noOp = new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling up seqNo history");
240-
innerNoOp(noOp);
226+
for (
227+
long seqNo = localCheckpoint + 1;
228+
seqNo <= maxSeqNo;
229+
seqNo = seqNoService().getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
230+
innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps"));
241231
numNoOpsAdded++;
242-
assert seqNo <= seqNoService.getLocalCheckpoint() : "localCheckpoint didn't advanced used to be " + seqNo + " now it's on:"
243-
+ seqNoService.getLocalCheckpoint();
232+
assert seqNo <= seqNoService().getLocalCheckpoint()
233+
: "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService().getLocalCheckpoint() + "]";
244234

245235
}
246236
return numNoOpsAdded;

core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
365365
}
366366
indexShard.performTranslogRecovery(indexShouldExists);
367367
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
368-
indexShard.getEngine().fillSequenceNumberHistory(indexShard.getPrimaryTerm());
368+
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
369369
}
370370
indexShard.finalizeRecovery();
371371
indexShard.postRecovery("post recovery from shard_store");

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@
114114
import org.elasticsearch.index.mapper.RootObjectMapper;
115115
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
116116
import org.elasticsearch.index.mapper.SourceFieldMapper;
117-
import org.elasticsearch.index.mapper.SourceToParse;
118117
import org.elasticsearch.index.mapper.Uid;
119118
import org.elasticsearch.index.mapper.UidFieldMapper;
120119
import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -3580,6 +3579,8 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro
35803579

35813580
try (Engine recoveringEngine =
35823581
new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
3582+
recoveringEngine.recoverFromTranslog();
3583+
recoveringEngine.fillSeqNoGaps(2);
35833584
assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
35843585
}
35853586
}
@@ -3618,6 +3619,8 @@ public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica() throws I
36183619

36193620
try (Engine recoveringEngine =
36203621
new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
3622+
recoveringEngine.recoverFromTranslog();
3623+
recoveringEngine.fillSeqNoGaps(1);
36213624
assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (3 * (docs - 1) + 2 - 1)));
36223625
}
36233626
}
@@ -3719,21 +3722,35 @@ public long generateSeqNo() {
37193722
throw new UnsupportedOperationException();
37203723
}
37213724
};
3722-
noOpEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
3725+
noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
3726+
@Override
3727+
public SequenceNumbersService seqNoService() {
3728+
return seqNoService;
3729+
}
3730+
};
3731+
noOpEngine.recoverFromTranslog();
37233732
final long primaryTerm = randomNonNegativeLong();
3733+
final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm);
37243734
final String reason = randomAlphaOfLength(16);
37253735
noOpEngine.noOp(
3726-
new Engine.NoOp(
3727-
maxSeqNo + 1,
3728-
primaryTerm,
3729-
randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY),
3730-
System.nanoTime(),
3731-
reason));
3736+
new Engine.NoOp(
3737+
maxSeqNo + 1,
3738+
primaryTerm,
3739+
randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY),
3740+
System.nanoTime(),
3741+
reason));
37323742
assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1)));
3733-
assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1));
3734-
final Translog.Operation op = noOpEngine.getTranslog().newSnapshot().next();
3735-
assertThat(op, instanceOf(Translog.NoOp.class));
3736-
final Translog.NoOp noOp = (Translog.NoOp) op;
3743+
assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1 + gapsFilled));
3744+
// skip to the op that we added to the translog
3745+
Translog.Operation op;
3746+
Translog.Operation last = null;
3747+
final Translog.Snapshot snapshot = noOpEngine.getTranslog().newSnapshot();
3748+
while ((op = snapshot.next()) != null) {
3749+
last = op;
3750+
}
3751+
assertNotNull(last);
3752+
assertThat(last, instanceOf(Translog.NoOp.class));
3753+
final Translog.NoOp noOp = (Translog.NoOp) last;
37373754
assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 1)));
37383755
assertThat(noOp.primaryTerm(), equalTo(primaryTerm));
37393756
assertThat(noOp.reason(), equalTo(reason));
@@ -3846,7 +3863,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
38463863
for (int i = 0; i < docs; i++) {
38473864
final String docId = Integer.toString(i);
38483865
final ParsedDocument doc =
3849-
testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null);
3866+
testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null);
38503867
Engine.Index primaryResponse = indexForDoc(doc);
38513868
Engine.IndexResult indexResult = engine.index(primaryResponse);
38523869
if (randomBoolean()) {
@@ -3864,22 +3881,22 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
38643881
boolean flushed = false;
38653882
Engine recoveringEngine = null;
38663883
try {
3867-
assertEquals(docs-1, engine.seqNoService().getMaxSeqNo());
3868-
assertEquals(docs-1, engine.seqNoService().getLocalCheckpoint());
3884+
assertEquals(docs - 1, engine.seqNoService().getMaxSeqNo());
3885+
assertEquals(docs - 1, engine.seqNoService().getLocalCheckpoint());
38693886
assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo());
38703887
assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint());
38713888
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
38723889
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().totalOperations());
38733890
recoveringEngine.recoverFromTranslog();
38743891
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
38753892
assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
3876-
assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2));
3893+
assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2));
38773894

38783895
// now snapshot the tlog and ensure the primary term is updated
38793896
Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot();
3880-
assertTrue((maxSeqIDOnReplica+1) - numDocsOnReplica <= snapshot.totalOperations());
3897+
assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations());
38813898
Translog.Operation operation;
3882-
while((operation = snapshot.next()) != null) {
3899+
while ((operation = snapshot.next()) != null) {
38833900
if (operation.opType() == Translog.Operation.Type.NO_OP) {
38843901
assertEquals(2, operation.primaryTerm());
38853902
} else {
@@ -3905,7 +3922,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
39053922
recoveringEngine.recoverFromTranslog();
39063923
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
39073924
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
3908-
assertEquals(0, recoveringEngine.fillSequenceNumberHistory(3));
3925+
assertEquals(0, recoveringEngine.fillSeqNoGaps(3));
39093926
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
39103927
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
39113928
} finally {

0 commit comments

Comments
 (0)