Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,14 @@ private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig
// Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and
// Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to
// disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker.
if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
// The max_seq_no of Lucene commit in the old indices might be smaller than seq_no of some documents in the commit.
// We have to rebuild the LocalCheckpointTracker for those indices. See https://github.com/elastic/elasticsearch/pull/38879.
// Note that this bug affects only indices created between 6.5.0 and 6.6.1 with soft-deletes is explicitly enabled.
final boolean mustRebuild = engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_6_2);
if (engineConfig.getIndexSettings().isSoftDeleteEnabled() && (localCheckpoint < maxSeqNo || mustRebuild)) {
try (Searcher searcher = searcherSupplier.get()) {
Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo,
tracker::markSeqNoAsCompleted);
final long toSeqNo = mustRebuild ? Long.MAX_VALUE : maxSeqNo;
Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, toSeqNo, tracker::markSeqNoAsCompleted);
}
}
return tracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.VersionUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;

Expand Down Expand Up @@ -5632,6 +5633,61 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
}
}

/**
* Simulate a bug in https://github.com/elastic/elasticsearch/pull/38879 where the max_seq_no
* of the index commit can be smaller than seq_no of some documents in the commit.
*/
public void testAlwaysRebuildLocalCheckpointForOldIndex() throws Exception {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_6_5_0, Version.V_6_6_1))
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
Path translogPath = createTempDir();
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean());
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
final List<DocIdSeqNoAndTerm> docs;
try (InternalEngine engine = createEngine(config)) {
for (Engine.Operation op : operations) {
applyOperation(engine, op);
if (randomInt(100) < 10) {
engine.flush();
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
}
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
docs = getDocIds(engine, true);
}
trimUnsafeCommits(config);
// Simulate a bug in https://github.com/elastic/elasticsearch/pull/38879 where max_seq_no is smaller than seq_no of some docs.
if (randomBoolean()) {
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
try (IndexWriter writer = new IndexWriter(config.getStore().directory(), iwc)) {
Map<String, String> userData = new HashMap<>();
writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
long maxSeqNo = randomLongBetween(commitInfo.localCheckpoint, commitInfo.maxSeqNo);
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
}
try (InternalEngine engine = new InternalEngine(config)) {
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(getDocIds(engine, true), equalTo(docs));
}
}
}

public void testOpenSoftDeletesIndexWithSoftDeletesDisabled() throws Exception {
try (Store store = createStore()) {
Path translogPath = createTempDir();
Expand Down