Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 113 additions & 85 deletions server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -115,6 +118,11 @@ synchronized long getMinRetainedSeqNo() {
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
*/
Query getRetentionQuery() {
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE);
return new BooleanQuery.Builder()
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE), BooleanClause.Occur.SHOULD)
// Since updated_by_seqno is an updatable DV, we have to do a linear scan to find matches of its range query.
.add(NumericDocValuesField.newSlowRangeQuery(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME,
globalCheckpointSupplier.getAsLong() + 1, Long.MAX_VALUE), BooleanClause.Occur.SHOULD)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public static SequenceIDFields emptySeqID() {
public static final String CONTENT_TYPE = "_seq_no";
public static final String PRIMARY_TERM_NAME = "_primary_term";
public static final String TOMBSTONE_NAME = "_tombstone";
public static final String UPDATED_BY_SEQNO_NAME = "_updated_by_seqno";

public static class SeqNoDefaults {
public static final String NAME = SeqNoFieldMapper.NAME;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

public class LuceneChangesSnapshotTests extends EngineTestCase {
private MapperService mapperService;
Expand Down Expand Up @@ -187,7 +188,7 @@ public void testDedupByPrimaryTerm() throws Exception {
while ((op = snapshot.next()) != null) {
assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo())));
}
assertThat(snapshot.skippedOperations(), equalTo(totalOps - latestOperations.size()));
assertThat(snapshot.skippedOperations(), greaterThanOrEqualTo(totalOps - latestOperations.size()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ public void testCheckpointsAdvance() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/31637")
public void testConflictingOpsOnReplica() throws Exception {
Map<String, String> mappings =
Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -619,15 +620,24 @@ protected static void assertVisibleCount(InternalEngine engine, int numDocs, boo

public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType,
long primaryTerm, int minOpCount, int maxOpCount, String docId) {
return generateMultipleDocumentsHistory(forReplica, versionType, primaryTerm, minOpCount, maxOpCount,
Collections.singletonList(docId), true, System::currentTimeMillis);
}

public static List<Engine.Operation> generateMultipleDocumentsHistory(boolean forReplica, VersionType versionType, long primaryTerm,
int minOpCount, int maxOpCount, List<String> docIds,
boolean allowGapInSeqNo, LongSupplier currentTimeInMsSupplier) {
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
final List<Engine.Operation> ops = new ArrayList<>();
final Term id = newUid(docId);
final int startWithSeqNo = 0;
final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_";
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean();
final int seqNoGap = allowGapInSeqNo && randomBoolean() ? 2 : 1;
for (int i = 0; i < numOfOps; i++) {
final Engine.Operation op;
final long version;
String docId = randomFrom(docIds);
final Term id = newUid(docId);
final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_";
switch (versionType) {
case INTERNAL:
version = forReplica ? i : Versions.MATCH_ANY;
Expand All @@ -646,27 +656,40 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
}
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
forReplica ? null : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis(), -1, false
currentTimeInMsSupplier.getAsLong(), -1, false
);
} else {
op = new Engine.Delete("test", docId, id,
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
forReplica ? null : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis());
currentTimeInMsSupplier.getAsLong());
}
ops.add(op);
}
return ops;
}

/**
* Partitions a list of operations into a multiple sub-lists, then shuffles each sub-list.
* This method shuffles operations in a more realistic way than {@link Randomness#shuffle(List)}.
*/
public void realisticShuffleOperations(List<Engine.Operation> operations) {
int index = 0;
while (index < operations.size()) {
int to = Math.min(operations.size(), index + between(10, 50));
Randomness.shuffle(operations.subList(index, to)); // subList is a direct view
index = to;
}
}

public static void assertOpsOnReplica(
final List<Engine.Operation> ops,
final InternalEngine replicaEngine,
Expand Down