Skip to content
39 changes: 39 additions & 0 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -42,6 +43,7 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
Expand Down Expand Up @@ -82,6 +84,7 @@
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.io.IOException;
import java.text.ParseException;
Expand All @@ -91,6 +94,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.LongConsumer;

public class Lucene {
public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70";
Expand Down Expand Up @@ -968,4 +972,39 @@ public CacheHelper getReaderCacheHelper() {
public static NumericDocValuesField newSoftDeletesField() {
return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);
}

/**
* Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive)
* in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found.
*
* @param directoryReader the directory reader to scan
* @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive)
* @param toSeqNo the upper bound of a range of seq_no to scan (inclusive)
* @param onNewSeqNo the callback to be called whenever a new valid sequence number is found
*/
public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo,
LongConsumer onNewSeqNo) throws IOException {
final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader);
final IndexSearcher searcher = new IndexSearcher(reader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : reader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
continue;
}
final DocIdSetIterator docIdSetIterator = scorer.iterator();
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) {
throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId);
}
final long seqNo = seqNoDocValues.longValue();
assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo;
onNewSeqNo.accept(seqNo);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;

public class InternalEngine extends Engine {
Expand Down Expand Up @@ -189,7 +190,6 @@ public InternalEngine(EngineConfig engineConfig) {
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy =
Expand Down Expand Up @@ -223,6 +223,8 @@ public InternalEngine(EngineConfig engineConfig) {
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
this.internalSearcherManager.addListener(listener);
}
this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger,
() -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
success = true;
Expand All @@ -238,16 +240,29 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos,
Logger logger, Supplier<Searcher> searcherSupplier, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
try {
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet());
final long maxSeqNo = seqNoStats.maxSeqNo;
final long localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
// Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will
// create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed.
// Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and
// Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to
// disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker.
if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Searcher searcher = searcherSupplier.get()) {
Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo, tracker::markSeqNoAsCompleted);
}
}
return tracker;
} catch (IOException ex) {
throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex);
}
}

private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
Expand Down Expand Up @@ -665,6 +680,8 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
} else if (op.seqNo() > docAndSeqNo.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
// load term to tie break
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
if (op.primaryTerm() > existingTerm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5081,6 +5081,77 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception {
}
}

public void testRebuildLocalCheckpointTracker() throws Exception {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Path translogPath = createTempDir();
int numOps = scaledRandomIntBetween(1, 500);
List<Engine.Operation> operations = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
long seqNo = i;
final ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null);
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true));
} else if (randomBoolean()) {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
} else {
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA,
threadPool.relativeTimeInMillis(), "test-" + i));
}
}
Randomness.shuffle(operations);
List<List<Engine.Operation>> commits = new ArrayList<>();
commits.add(new ArrayList<>());
try (Store store = createStore()) {
EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
try (InternalEngine engine = createEngine(config)) {
List<Engine.Operation> flushedOperations = new ArrayList<>();
for (Engine.Operation op : operations) {
flushedOperations.add(op);
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
} else if (op instanceof Engine.Delete) {
engine.delete((Engine.Delete) op);
} else {
engine.noOp((Engine.NoOp) op);
}
if (randomInt(100) < 10) {
engine.refresh("test");
}
if (randomInt(100) < 5) {
engine.flush();
commits.add(new ArrayList<>(flushedOperations));
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
}
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
}
trimUnsafeCommits(config);
List<Engine.Operation> safeCommit = null;
for (int i = commits.size() - 1; i >= 0; i--) {
if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) {
safeCommit = commits.get(i);
break;
}
}
assertThat(safeCommit, notNullValue());
try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog
final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker();
for (Engine.Operation op : operations) {
assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(),
tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op)));
}
}
}
}

static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.anyOf;
Expand Down Expand Up @@ -681,6 +683,50 @@ public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception {
}
}

public void testAddNewReplicas() throws Exception {
try (ReplicationGroup shards = createGroup(between(0, 1))) {
shards.startAll();
Thread[] threads = new Thread[between(1, 3)];
AtomicBoolean isStopped = new AtomicBoolean();
boolean appendOnly = randomBoolean();
AtomicInteger docId = new AtomicInteger();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (isStopped.get() == false) {
try {
if (appendOnly) {
String id = randomBoolean() ? Integer.toString(docId.incrementAndGet()) : null;
shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON));
} else if (frequently()) {
String id = Integer.toString(frequently() ? docId.incrementAndGet() : between(0, 10));
shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON));
} else {
String id = Integer.toString(between(0, docId.get()));
shards.delete(new DeleteRequest(index.getName(), "type", id));
}
if (randomInt(100) < 10) {
shards.getPrimary().flush(new FlushRequest());
}
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
});
threads[i].start();
}
assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(50)));
shards.getPrimary().sync();
IndexShard newReplica = shards.addReplica();
shards.recoverReplica(newReplica);
assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(100)));
isStopped.set(true);
for (Thread thread : threads) {
thread.join();
}
assertBusy(() -> assertThat(getDocIdAndSeqNos(newReplica), equalTo(getDocIdAndSeqNos(shards.getPrimary()))));
}
}

public static class BlockingTarget extends RecoveryTarget {

private final CountDownLatch recoveryBlocked;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,64 @@ public void testFailOverOnFollower() throws Exception {
unfollowIndex("follower-index");
}

public void testAddNewReplicasOnFollower() throws Exception {
int numberOfReplicas = between(0, 1);
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
PutFollowAction.Request follow = follow("leader-index", "follower-index");
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(2, 3));
ensureFollowerGreen("follower-index");
AtomicBoolean stopped = new AtomicBoolean();
AtomicInteger docID = new AtomicInteger();
boolean appendOnly = randomBoolean();
Thread indexingOnLeader = new Thread(() -> {
while (stopped.get() == false) {
try {
if (appendOnly) {
String id = Integer.toString(docID.incrementAndGet());
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
} else if (frequently()) {
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 100));
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
} else {
String id = Integer.toString(between(0, docID.get()));
leaderClient().prepareDelete("leader-index", "doc", id).get();
}
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
});
indexingOnLeader.start();
Thread flushingOnFollower = new Thread(() -> {
while (stopped.get() == false) {
try {
if (rarely()) {
followerClient().admin().indices().prepareFlush("follower-index").get();
}
if (rarely()) {
followerClient().admin().indices().prepareRefresh("follower-index").get();
}
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
});
flushingOnFollower.start();
atLeastDocsIndexed(followerClient(), "follower-index", 50);
followerClient().admin().indices().prepareUpdateSettings("follower-index")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas + 1).build()).get();
ensureFollowerGreen("follower-index");
atLeastDocsIndexed(followerClient(), "follower-index", 100);
stopped.set(true);
flushingOnFollower.join();
indexingOnLeader.join();
assertSameDocCount("leader-index", "follower-index");
unfollowIndex("follower-index");
}

private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
Expand Down
Loading