Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,8 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
} else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version());
boolean addStaleOpToLucene = softDeleteEnabled && localCheckpointTracker.isProcessed(index.seqNo()) == false;
plan = IndexingStrategy.processAsStaleOp(addStaleOpToLucene, index.seqNo(), index.version());
} else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
index.seqNo(), index.version());
Expand Down Expand Up @@ -1258,7 +1259,8 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
} else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version());
boolean addStaleOpToLucene = softDeleteEnabled && localCheckpointTracker.isProcessed(delete.seqNo()) == false;
plan = DeletionStrategy.processAsStaleOp(addStaleOpToLucene, false, delete.seqNo(), delete.version());
} else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
delete.seqNo(), delete.version());
Expand Down Expand Up @@ -1400,7 +1402,9 @@ public void maybePruneDeletes() {
@Override
public NoOpResult noOp(final NoOp noOp) {
NoOpResult noOpResult;
try (ReleasableLock ignored = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire();
// prevent two noOps with same seqno get in at the same time
Releasable uidLock = versionMap.acquireLock(new BytesRef(Long.toString(noOp.seqNo())))) {
noOpResult = innerNoOp(noOp);
} catch (final Exception e) {
noOpResult = new NoOpResult(noOp.seqNo(), e);
Expand All @@ -1414,7 +1418,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
final long seqNo = noOp.seqNo();
try {
Exception failure = null;
if (softDeleteEnabled) {
if (softDeleteEnabled && localCheckpointTracker.isProcessed(noOp.seqNo()) == false) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
}
}

/**
* Checks if the given sequence number has been processed (and tracked) in this tracker.
*/
public synchronized boolean isProcessed(long seqNo) {
if (seqNo <= checkpoint) {
return true;
}
if (seqNo >= nextSeqNo) {
return false;
}
final long bitSetKey = getBitSetKey(seqNo);
final CountedBitSet bitSet = processedSeqNo.get(bitSetKey);
return bitSet != null && bitSet.get(seqNoToBitSetOffset(seqNo));
}

/**
* Resets the checkpoint to the specified value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -4947,6 +4948,44 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
}
}

public void testDoNotIndexDuplicateStaleDocsToLucene() throws Exception {
int numOps = scaledRandomIntBetween(10, 200);
List<Engine.Operation> ops = new ArrayList<>();
Map<String, Long> versions = new HashMap<>();
for (int seqNo = 0; seqNo < numOps; seqNo++) {
String id = Integer.toString(randomIntBetween(1, 5));
long version = versions.compute(id, (k, v) -> (v == null ? 1 : v) + between(1, 10));
int copies = between(1, 3);
if (randomBoolean()) {
for (int i = 0; i < copies; i++) {
ops.add(replicaIndexForDoc(createParsedDoc(id, null), version, seqNo, randomBoolean()));
}
} else if (frequently()) {
for (int i = 0; i < copies; i++) {
ops.add(replicaDeleteForDoc(id, version, seqNo, randomNonNegativeLong()));
}
} else {
for (int i = 0; i < copies; i++) {
ops.add(new Engine.NoOp(seqNo, primaryTerm.get(), REPLICA, randomNonNegativeLong(), "test-" + seqNo));
}
}
}
Settings settings = Settings.builder().put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
try (Store store = createStore();
InternalEngine engine = createEngine(
config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), newMergePolicy(), null))) {
Randomness.shuffle(ops);
concurrentlyApplyOps(ops, engine);
engine.refresh("test", Engine.SearcherScope.INTERNAL);
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertThat(searcher.reader().maxDoc(), equalTo(numOps));
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
}
}

private static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,20 @@ public void testSimplePrimary() {

public void testSimpleReplica() {
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(tracker.isProcessed(randomNonNegativeLong()), equalTo(false));
tracker.markSeqNoAsCompleted(0L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.isProcessed(0L), equalTo(true));
assertThat(tracker.isProcessed(between(1, Integer.MAX_VALUE)), equalTo(false));
tracker.markSeqNoAsCompleted(2L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.isProcessed(1L), equalTo(false));
assertThat(tracker.isProcessed(2L), equalTo(true));
assertThat(tracker.isProcessed(between(3, Integer.MAX_VALUE)), equalTo(false));
tracker.markSeqNoAsCompleted(1L);
assertThat(tracker.getCheckpoint(), equalTo(2L));
assertThat(tracker.isProcessed(between(0, 2)), equalTo(true));
assertThat(tracker.isProcessed(between(3, Integer.MAX_VALUE)), equalTo(false));
}

public void testLazyInitialization() {
Expand Down Expand Up @@ -199,9 +207,14 @@ protected void doRun() throws Exception {
}
assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L));
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
assertThat(tracker.isProcessed(randomValueOtherThan((int) unFinishedSeq, () -> randomFrom(seqNos))), equalTo(true));
assertThat(tracker.isProcessed(unFinishedSeq), equalTo(false));
assertThat(tracker.isProcessed(between(maxOps, Integer.MAX_VALUE)), equalTo(false));
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
assertThat(tracker.isProcessed(randomFrom(seqNos)), equalTo(true));
assertThat(tracker.isProcessed(between(maxOps, Integer.MAX_VALUE)), equalTo(false));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
}
Expand Down