Skip to content

Commit f6487e1

Browse files
committed
simulate merge
1 parent e484758 commit f6487e1

File tree

2 files changed

+202
-0
lines changed

2 files changed

+202
-0
lines changed

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Base64;
3131
import java.util.Collections;
3232
import java.util.Comparator;
33+
import java.util.HashMap;
3334
import java.util.HashSet;
3435
import java.util.Iterator;
3536
import java.util.LinkedHashMap;
@@ -68,6 +69,7 @@
6869
import org.apache.lucene.document.StoredField;
6970
import org.apache.lucene.document.TextField;
7071
import org.apache.lucene.index.DirectoryReader;
72+
import org.apache.lucene.index.FilterDirectoryReader;
7173
import org.apache.lucene.index.IndexCommit;
7274
import org.apache.lucene.index.IndexReader;
7375
import org.apache.lucene.index.IndexWriter;
@@ -82,15 +84,23 @@
8284
import org.apache.lucene.index.NoMergePolicy;
8385
import org.apache.lucene.index.NumericDocValues;
8486
import org.apache.lucene.index.PointValues;
87+
import org.apache.lucene.index.ReaderUtil;
8588
import org.apache.lucene.index.SegmentInfos;
8689
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
8790
import org.apache.lucene.index.Term;
8891
import org.apache.lucene.index.TieredMergePolicy;
92+
import org.apache.lucene.search.BooleanClause;
93+
import org.apache.lucene.search.BooleanQuery;
8994
import org.apache.lucene.search.DocIdSetIterator;
95+
import org.apache.lucene.search.DocValuesFieldExistsQuery;
9096
import org.apache.lucene.search.IndexSearcher;
9197
import org.apache.lucene.search.MatchAllDocsQuery;
98+
import org.apache.lucene.search.Query;
9299
import org.apache.lucene.search.ReferenceManager;
100+
import org.apache.lucene.search.ScoreDoc;
93101
import org.apache.lucene.search.Sort;
102+
import org.apache.lucene.search.SortField;
103+
import org.apache.lucene.search.SortedNumericSortField;
94104
import org.apache.lucene.search.SortedSetSortField;
95105
import org.apache.lucene.search.TermQuery;
96106
import org.apache.lucene.search.TopDocs;
@@ -102,6 +112,7 @@
102112
import org.apache.lucene.util.Bits;
103113
import org.apache.lucene.util.BytesRef;
104114
import org.apache.lucene.util.FixedBitSet;
115+
import org.apache.lucene.util.SetOnce;
105116
import org.elasticsearch.ElasticsearchException;
106117
import org.elasticsearch.Version;
107118
import org.elasticsearch.action.index.IndexRequest;
@@ -129,6 +140,7 @@
129140
import org.elasticsearch.common.util.BigArrays;
130141
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
131142
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
143+
import org.elasticsearch.common.util.set.Sets;
132144
import org.elasticsearch.common.xcontent.XContentType;
133145
import org.elasticsearch.core.internal.io.IOUtils;
134146
import org.elasticsearch.index.IndexSettings;
@@ -160,6 +172,7 @@
160172
import org.elasticsearch.index.translog.TranslogConfig;
161173
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
162174
import org.elasticsearch.test.IndexSettingsModule;
175+
import org.elasticsearch.test.junit.annotations.TestLogging;
163176
import org.hamcrest.MatcherAssert;
164177
import org.hamcrest.Matchers;
165178

@@ -5054,6 +5067,181 @@ public void testRecordUpdatedBySeqNo() throws Exception {
50545067
}
50555068
}
50565069

5070+
@TestLogging("_ROOT:DEBUG")
5071+
public void testKeepDocsForRollback() throws Exception {
5072+
IOUtils.close(engine, store);
5073+
threadPool = spy(threadPool);
5074+
AtomicLong clockTime = new AtomicLong();
5075+
when(threadPool.relativeTimeInMillis()).thenAnswer(i -> clockTime.incrementAndGet());
5076+
List<Engine.Operation> operations = new ArrayList<>();
5077+
int numOps = scaledRandomIntBetween(10, 200);
5078+
for (int seqNo = 0; seqNo < numOps; seqNo++) {
5079+
String id = Integer.toString(between(1, 5));
5080+
if (randomBoolean()) {
5081+
ParsedDocument parseDoc = createParsedDoc(id, null);
5082+
operations.add(new Engine.Index(newUid(parseDoc), parseDoc, seqNo, primaryTerm.get(), 1, null,
5083+
REPLICA, between(1, 10), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean()));
5084+
} else {
5085+
operations.add(new Engine.Delete("test", id, newUid(id), seqNo, primaryTerm.get(), 1, null,
5086+
REPLICA, between(1, 10)));
5087+
}
5088+
}
5089+
Settings.Builder settings = Settings.builder()
5090+
.put(defaultSettings.getSettings())
5091+
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(between(0, 100)).getStringRep())
5092+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
5093+
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
5094+
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
5095+
realisticShuffleOperations(operations);
5096+
long globalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
5097+
Map<Long, Engine.Operation> processedOps = new HashMap<>();
5098+
SetOnce<IndexWriter> indexWriter = new SetOnce<>();
5099+
IndexWriterFactory indexWriterFactory = (iwc, dir) -> {
5100+
indexWriter.set(new IndexWriter(iwc, dir));
5101+
return indexWriter.get();
5102+
};
5103+
Set<Long> lastTombstones = Collections.emptySet();
5104+
try (Store store = createStore();
5105+
InternalEngine engine = createEngine(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) {
5106+
for (Engine.Operation op : operations) {
5107+
Set<Long> tombstones = engine.getDeletedTombstones().stream().map(del -> del.seqNo).collect(Collectors.toSet());
5108+
if (op instanceof Engine.Index) {
5109+
logger.debug("index id={} seq={} gcp={} tombstones={}", op.id(), op.seqNo(), globalCheckpoint, tombstones);
5110+
engine.index((Engine.Index) op);
5111+
} else if (op instanceof Engine.Delete) {
5112+
logger.debug("delete id={} seq={} gcp={} tombstones={}", op.id(), op.seqNo(), globalCheckpoint, tombstones);
5113+
engine.delete((Engine.Delete) op);
5114+
}
5115+
processedOps.put(op.seqNo(), op);
5116+
if (between(1, 20) == 1) {
5117+
assertDocumentsForRollback(engine, globalCheckpoint, processedOps);
5118+
}
5119+
if (between(1, 5) == 1) {
5120+
engine.maybePruneDeletes();
5121+
}
5122+
if (between(1, 20) == 1) {
5123+
BooleanQuery retentionQuery = new BooleanQuery.Builder()
5124+
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, globalCheckpoint + 1, Long.MAX_VALUE),
5125+
BooleanClause.Occur.SHOULD)
5126+
.add(NumericDocValuesField.newSlowRangeQuery(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, globalCheckpoint + 1, Long.MAX_VALUE),
5127+
BooleanClause.Occur.SHOULD)
5128+
.build();
5129+
List<Engine.Operation> reclaimedOps = simulateMerge(engine, indexWriter.get(), retentionQuery)
5130+
.stream().map(processedOps::get).collect(Collectors.toList());
5131+
for (Engine.Operation reclaimedOp : reclaimedOps) {
5132+
logger.debug("merge reclaim id={} seq={}", reclaimedOp.id(), reclaimedOp.seqNo());
5133+
}
5134+
}
5135+
globalCheckpoint = randomLongBetween(globalCheckpoint, engine.getLocalCheckpoint());
5136+
Set<Long> prunedTombstone = Sets.difference(lastTombstones, tombstones);
5137+
for (long prunedSeqNo : prunedTombstone) {
5138+
logger.debug("prune tombstone id={} seq={}", processedOps.get(prunedSeqNo).id(), prunedSeqNo);
5139+
}
5140+
lastTombstones = tombstones;
5141+
}
5142+
assertDocumentsForRollback(engine, globalCheckpoint, processedOps);
5143+
}
5144+
}
5145+
5146+
/**
5147+
* Here we simulate Lucene merges for these purposes:
5148+
* - The simulation can randomly reclaim a subset of reclaimable operations instead of all docs like the actual merges
5149+
* - The simulation is more deterministic than the actual merge and can return the operations have been reclaimed.
5150+
*
5151+
* @param retentionQuery deleted documents matching this query won't be reclaimed (see {@link SoftDeletesPolicy#getRetentionQuery()}
5152+
* @return a list of operations have been reclaimed
5153+
*/
5154+
private List<Long> simulateMerge(InternalEngine engine, IndexWriter indexWriter, Query retentionQuery) throws IOException {
5155+
try (Searcher engineSearcher = engine.acquireSearcher("simulate-merge", Engine.SearcherScope.INTERNAL)) {
5156+
IndexSearcher searcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
5157+
BooleanQuery reclaimQuery = new BooleanQuery.Builder()
5158+
.add(new DocValuesFieldExistsQuery(Lucene.SOFT_DELETE_FIELD), BooleanClause.Occur.MUST)
5159+
.add(retentionQuery, BooleanClause.Occur.MUST_NOT).build();
5160+
TopDocs reclaimableDocs = searcher.search(reclaimQuery, Integer.MAX_VALUE);
5161+
if (reclaimableDocs.scoreDocs.length == 0) {
5162+
return Collections.emptyList();
5163+
}
5164+
List<ScoreDoc> docsToReclaim = randomSubsetOf(Arrays.asList(reclaimableDocs.scoreDocs));
5165+
DirectoryReader inReader = engineSearcher.getDirectoryReader();
5166+
while (inReader instanceof FilterDirectoryReader) {
5167+
inReader = ((FilterDirectoryReader) inReader).getDelegate();
5168+
}
5169+
List<Long> reclaimedOps = new ArrayList<>();
5170+
for (ScoreDoc docToReclaim : docsToReclaim) {
5171+
if (indexWriter.tryDeleteDocument(inReader, docToReclaim.doc) != -1) {
5172+
reclaimedOps.add(readSeqNo(inReader, docToReclaim.doc));
5173+
}
5174+
}
5175+
return reclaimedOps;
5176+
}
5177+
}
5178+
5179+
/**
5180+
* This assertion asserts that the previous copy of every operation after the global_checkpoint is retained for rollback:
5181+
* 1. If the previous copy is an index, that copy must be retained
5182+
* 2. If the previous copy is a delete, either that copy or another delete or nothing is retained, but must not an index
5183+
*/
5184+
private void assertDocumentsForRollback(InternalEngine engine, long globalCheckpoint,
5185+
Map<Long, Engine.Operation> processedOps) throws IOException {
5186+
List<Engine.Operation> rollbackOps = processedOps.values().stream()
5187+
.filter(op -> op.seqNo() > globalCheckpoint).collect(Collectors.toList());
5188+
Map<Long, Engine.Operation> previousCopies = new HashMap<>();
5189+
for (Engine.Operation op : rollbackOps) {
5190+
processedOps.values().stream().filter(target -> target.seqNo() < op.seqNo() && target.id().equals(op.id()))
5191+
.forEach(target -> {
5192+
previousCopies.compute(op.seqNo(), (k, v) -> {
5193+
if (v == null || v.seqNo() < target.seqNo()) return target;
5194+
else return v;
5195+
});
5196+
});
5197+
}
5198+
engine.refresh("test", Engine.SearcherScope.INTERNAL);
5199+
try (Searcher engineSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
5200+
DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader());
5201+
IndexSearcher searcher = new IndexSearcher(reader);
5202+
searcher.setQueryCache(null);
5203+
for (Engine.Operation rollbackOp : rollbackOps) {
5204+
Engine.Operation previousCopy = previousCopies.get(rollbackOp.seqNo());
5205+
if (previousCopy == null) {
5206+
continue;
5207+
}
5208+
BooleanQuery previousQuery = new BooleanQuery.Builder()
5209+
.add(new TermQuery(rollbackOp.uid()), BooleanClause.Occur.FILTER)
5210+
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, 0, rollbackOp.seqNo() - 1), BooleanClause.Occur.FILTER)
5211+
.build();
5212+
TopDocs previousDocs = searcher.search(previousQuery, 1,
5213+
new Sort(new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG, true)));
5214+
// If the previous copy is an index, that copy must be retained
5215+
if (previousCopy instanceof Engine.Index) {
5216+
assertThat("operation id=" + previousCopy.id() + " seq=" + previousCopy.seqNo() + " has been reclaimed",
5217+
previousDocs.totalHits, greaterThanOrEqualTo(1L));
5218+
long foundSeqNo = readSeqNo(reader, previousDocs.scoreDocs[0].doc);
5219+
assertThat("rollback id=" + rollbackOp.id() + " seq=" + rollbackOp.seqNo(), foundSeqNo, equalTo(previousCopy.seqNo()));
5220+
// If the previous copy is a delete, either that copy or another delete or nothing is retained, but must not an index
5221+
} else {
5222+
if (previousDocs.totalHits > 0) {
5223+
long actualSeqNo = readSeqNo(reader, previousDocs.scoreDocs[0].doc);
5224+
Engine.Operation foundOp = processedOps.get(actualSeqNo);
5225+
assertThat("rollback id=" + rollbackOp.id() + " seq=" + rollbackOp.seqNo() + ", found seq=" + foundOp.seqNo()
5226+
+ ", expected seq=" + previousCopy.seqNo(), foundOp, instanceOf(Engine.Delete.class));
5227+
}
5228+
}
5229+
}
5230+
}
5231+
}
5232+
5233+
private long readSeqNo(DirectoryReader reader, int docId) throws IOException {
5234+
List<LeafReaderContext> leaves = reader.leaves();
5235+
LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docId, leaves));
5236+
int segmentDocId = docId - leaf.docBase;
5237+
NumericDocValues dv = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
5238+
assert dv != null : "SeqNoDV does not exist";
5239+
if (dv.advanceExact(segmentDocId) == false) {
5240+
throw new AssertionError("doc " + docId + " does not have SeqNoDV");
5241+
}
5242+
return dv.longValue();
5243+
}
5244+
50575245
private static void trimUnsafeCommits(EngineConfig config) throws IOException {
50585246
final Store store = config.getStore();
50595247
final TranslogConfig translogConfig = config.getTranslogConfig();

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.cluster.metadata.IndexMetaData;
5050
import org.elasticsearch.cluster.routing.AllocationId;
5151
import org.elasticsearch.common.Nullable;
52+
import org.elasticsearch.common.Randomness;
5253
import org.elasticsearch.common.bytes.BytesArray;
5354
import org.elasticsearch.common.bytes.BytesReference;
5455
import org.elasticsearch.common.lucene.Lucene;
@@ -667,6 +668,19 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
667668
return ops;
668669
}
669670

671+
/**
672+
* Partitions a list of operations into a multiple sub-lists, then shuffles each sub-list.
673+
* This method shuffles operations in a more realistic way than {@link Randomness#shuffle(List)}.
674+
*/
675+
public void realisticShuffleOperations(List<Engine.Operation> operations) {
676+
int index = 0;
677+
while (index < operations.size()) {
678+
int to = Math.min(operations.size(), index + between(10, 20));
679+
Randomness.shuffle(operations.subList(index, to)); // subList is a direct view
680+
index = to;
681+
}
682+
}
683+
670684
public static void assertOpsOnReplica(
671685
final List<Engine.Operation> ops,
672686
final InternalEngine replicaEngine,

0 commit comments

Comments
 (0)