Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1421,7 +1421,7 @@ public interface Warmer {
* @param primaryTerm the shards primary term this engine was created for
* @return the number of no-ops added
*/
public abstract int fillSequenceNumberHistory(long primaryTerm) throws IOException;
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;

/**
* Performs recovery from the transaction log.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,6 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
logger.trace("recovered [{}]", seqNoStats);
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
// norelease
/*
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
* that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local
* checkpoint to the maximum sequence number in the commit (at the potential expense of correctness).
*/
while (seqNoService().getLocalCheckpoint() < seqNoService().getMaxSeqNo()) {
seqNoService().markSeqNoAsCompleted(seqNoService().getLocalCheckpoint() + 1);
}
indexWriter = writer;
translog = openTranslog(engineConfig, writer, () -> seqNoService().getGlobalCheckpoint());
assert translog.getGeneration() != null;
Expand Down Expand Up @@ -226,21 +217,20 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
}

@Override
public int fillSequenceNumberHistory(long primaryTerm) throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
public int fillSeqNoGaps(long primaryTerm) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long maxSeqId = seqNoService.getMaxSeqNo();
final long localCheckpoint = seqNoService().getLocalCheckpoint();
final long maxSeqNo = seqNoService().getMaxSeqNo();
int numNoOpsAdded = 0;
for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqId;
// the local checkpoint might have been advanced so we are leap-frogging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why lose the comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back in 61a2093.

// to the next seq ID we need to process and create a noop for
seqNo = seqNoService.getLocalCheckpoint()+1) {
final NoOp noOp = new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling up seqNo history");
innerNoOp(noOp);
for (
long seqNo = localCheckpoint + 1;
seqNo <= maxSeqNo;
seqNo = seqNoService().getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps"));
numNoOpsAdded++;
assert seqNo <= seqNoService.getLocalCheckpoint() : "localCheckpoint didn't advanced used to be " + seqNo + " now it's on:"
+ seqNoService.getLocalCheckpoint();
assert seqNo <= seqNoService().getLocalCheckpoint()
: "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService().getLocalCheckpoint() + "]";

}
return numNoOpsAdded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
}
indexShard.performTranslogRecovery(indexShouldExists);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.getEngine().fillSequenceNumberHistory(indexShard.getPrimaryTerm());
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
}
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -3580,6 +3579,8 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro

try (Engine recoveringEngine =
new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
recoveringEngine.recoverFromTranslog();
recoveringEngine.fillSeqNoGaps(2);
assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
}
}
Expand Down Expand Up @@ -3618,6 +3619,8 @@ public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica() throws I

try (Engine recoveringEngine =
new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
recoveringEngine.recoverFromTranslog();
recoveringEngine.fillSeqNoGaps(1);
assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (3 * (docs - 1) + 2 - 1)));
}
}
Expand Down Expand Up @@ -3719,21 +3722,35 @@ public long generateSeqNo() {
throw new UnsupportedOperationException();
}
};
noOpEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
@Override
public SequenceNumbersService seqNoService() {
return seqNoService;
}
};
noOpEngine.recoverFromTranslog();
final long primaryTerm = randomNonNegativeLong();
final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm);
final String reason = randomAlphaOfLength(16);
noOpEngine.noOp(
new Engine.NoOp(
maxSeqNo + 1,
primaryTerm,
randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY),
System.nanoTime(),
reason));
new Engine.NoOp(
maxSeqNo + 1,
primaryTerm,
randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY),
System.nanoTime(),
reason));
assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1)));
assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1));
final Translog.Operation op = noOpEngine.getTranslog().newSnapshot().next();
assertThat(op, instanceOf(Translog.NoOp.class));
final Translog.NoOp noOp = (Translog.NoOp) op;
assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1 + gapsFilled));
// skip to the op that we added to the translog
Translog.Operation op;
Translog.Operation last = null;
final Translog.Snapshot snapshot = noOpEngine.getTranslog().newSnapshot();
while ((op = snapshot.next()) != null) {
last = op;
}
assertNotNull(last);
assertThat(last, instanceOf(Translog.NoOp.class));
final Translog.NoOp noOp = (Translog.NoOp) last;
assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 1)));
assertThat(noOp.primaryTerm(), equalTo(primaryTerm));
assertThat(noOp.reason(), equalTo(reason));
Expand Down Expand Up @@ -3846,7 +3863,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
for (int i = 0; i < docs; i++) {
final String docId = Integer.toString(i);
final ParsedDocument doc =
testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null);
testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null);
Engine.Index primaryResponse = indexForDoc(doc);
Engine.IndexResult indexResult = engine.index(primaryResponse);
if (randomBoolean()) {
Expand All @@ -3864,22 +3881,22 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
boolean flushed = false;
Engine recoveringEngine = null;
try {
assertEquals(docs-1, engine.seqNoService().getMaxSeqNo());
assertEquals(docs-1, engine.seqNoService().getLocalCheckpoint());
assertEquals(docs - 1, engine.seqNoService().getMaxSeqNo());
assertEquals(docs - 1, engine.seqNoService().getLocalCheckpoint());
assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo());
assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().totalOperations());
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2));
assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2));

// now snapshot the tlog and ensure the primary term is updated
Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot();
assertTrue((maxSeqIDOnReplica+1) - numDocsOnReplica <= snapshot.totalOperations());
assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations());
Translog.Operation operation;
while((operation = snapshot.next()) != null) {
while ((operation = snapshot.next()) != null) {
if (operation.opType() == Translog.Operation.Type.NO_OP) {
assertEquals(2, operation.primaryTerm());
} else {
Expand All @@ -3905,7 +3922,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
assertEquals(0, recoveringEngine.fillSequenceNumberHistory(3));
assertEquals(0, recoveringEngine.fillSeqNoGaps(3));
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
} finally {
Expand Down