Skip to content

Commit 5c6711b

Browse files
authored
Use a _recovery_source if source is omitted or modified (#31106)
Today if a user omits the `_source` entirely or modifies the source on indexing we have no chance to re-create the document after it has been added. This is an issue for CCR and recovery based on soft deletes which we are going to make the default. This change adds an additional recovery source if the source is disabled or modified that is only kept around until the document leaves the retention policy window. This change adds a merge policy that efficiently removes this extra source on merge for all document that are live and not in the retention policy window anymore.
1 parent 20a2f64 commit 5c6711b

File tree

19 files changed

+687
-53
lines changed

19 files changed

+687
-53
lines changed

modules/percolator/src/test/java/org/elasticsearch/percolator/CandidateQueryTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.apache.lucene.store.RAMDirectory;
7878
import org.apache.lucene.util.BytesRef;
7979
import org.elasticsearch.Version;
80+
import org.elasticsearch.cluster.metadata.IndexMetaData;
8081
import org.elasticsearch.common.CheckedFunction;
8182
import org.elasticsearch.common.Strings;
8283
import org.elasticsearch.common.bytes.BytesArray;
@@ -87,6 +88,7 @@
8788
import org.elasticsearch.common.settings.Settings;
8889
import org.elasticsearch.common.xcontent.XContentFactory;
8990
import org.elasticsearch.index.IndexService;
91+
import org.elasticsearch.index.IndexSettings;
9092
import org.elasticsearch.index.mapper.DocumentMapper;
9193
import org.elasticsearch.index.mapper.MappedFieldType;
9294
import org.elasticsearch.index.mapper.MapperService;
@@ -1111,7 +1113,11 @@ private void duelRun(PercolateQuery.QueryStore queryStore, MemoryIndex memoryInd
11111113
}
11121114

11131115
private void addQuery(Query query, List<ParseContext.Document> docs) {
1114-
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
1116+
IndexMetaData build = IndexMetaData.builder("")
1117+
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
1118+
.numberOfShards(1).numberOfReplicas(0).build();
1119+
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
1120+
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
11151121
mapperService.documentMapperParser(), documentMapper, null, null);
11161122
fieldMapper.processQuery(query, parseContext);
11171123
ParseContext.Document queryDocument = parseContext.doc();

modules/percolator/src/test/java/org/elasticsearch/percolator/PercolatorFieldMapperTests.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.lucene.util.BytesRef;
4343
import org.elasticsearch.Version;
4444
import org.elasticsearch.action.support.PlainActionFuture;
45+
import org.elasticsearch.cluster.metadata.IndexMetaData;
4546
import org.elasticsearch.common.Strings;
4647
import org.elasticsearch.common.bytes.BytesArray;
4748
import org.elasticsearch.common.bytes.BytesReference;
@@ -58,6 +59,7 @@
5859
import org.elasticsearch.common.xcontent.XContentFactory;
5960
import org.elasticsearch.common.xcontent.XContentType;
6061
import org.elasticsearch.index.IndexService;
62+
import org.elasticsearch.index.IndexSettings;
6163
import org.elasticsearch.index.mapper.DocumentMapper;
6264
import org.elasticsearch.index.mapper.DocumentMapperParser;
6365
import org.elasticsearch.index.mapper.MapperParsingException;
@@ -182,7 +184,11 @@ public void testExtractTerms() throws Exception {
182184

183185
DocumentMapper documentMapper = mapperService.documentMapper("doc");
184186
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
185-
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
187+
IndexMetaData build = IndexMetaData.builder("")
188+
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
189+
.numberOfShards(1).numberOfReplicas(0).build();
190+
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
191+
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
186192
mapperService.documentMapperParser(), documentMapper, null, null);
187193
fieldMapper.processQuery(bq.build(), parseContext);
188194
ParseContext.Document document = parseContext.doc();
@@ -204,7 +210,7 @@ public void testExtractTerms() throws Exception {
204210
bq.add(termQuery1, Occur.MUST);
205211
bq.add(termQuery2, Occur.MUST);
206212

207-
parseContext = new ParseContext.InternalParseContext(Settings.EMPTY, mapperService.documentMapperParser(),
213+
parseContext = new ParseContext.InternalParseContext(settings, mapperService.documentMapperParser(),
208214
documentMapper, null, null);
209215
fieldMapper.processQuery(bq.build(), parseContext);
210216
document = parseContext.doc();
@@ -232,8 +238,12 @@ public void testExtractRanges() throws Exception {
232238
bq.add(rangeQuery2, Occur.MUST);
233239

234240
DocumentMapper documentMapper = mapperService.documentMapper("doc");
241+
IndexMetaData build = IndexMetaData.builder("")
242+
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
243+
.numberOfShards(1).numberOfReplicas(0).build();
244+
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
235245
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
236-
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
246+
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
237247
mapperService.documentMapperParser(), documentMapper, null, null);
238248
fieldMapper.processQuery(bq.build(), parseContext);
239249
ParseContext.Document document = parseContext.doc();
@@ -259,7 +269,7 @@ public void testExtractRanges() throws Exception {
259269
.rangeQuery(15, 20, true, true, null, null, null, null);
260270
bq.add(rangeQuery2, Occur.MUST);
261271

262-
parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
272+
parseContext = new ParseContext.InternalParseContext(settings,
263273
mapperService.documentMapperParser(), documentMapper, null, null);
264274
fieldMapper.processQuery(bq.build(), parseContext);
265275
document = parseContext.doc();
@@ -283,7 +293,11 @@ public void testExtractTermsAndRanges_failed() throws Exception {
283293
TermRangeQuery query = new TermRangeQuery("field1", new BytesRef("a"), new BytesRef("z"), true, true);
284294
DocumentMapper documentMapper = mapperService.documentMapper("doc");
285295
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
286-
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
296+
IndexMetaData build = IndexMetaData.builder("")
297+
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
298+
.numberOfShards(1).numberOfReplicas(0).build();
299+
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
300+
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
287301
mapperService.documentMapperParser(), documentMapper, null, null);
288302
fieldMapper.processQuery(query, parseContext);
289303
ParseContext.Document document = parseContext.doc();
@@ -298,7 +312,11 @@ public void testExtractTermsAndRanges_partial() throws Exception {
298312
PhraseQuery phraseQuery = new PhraseQuery("field", "term");
299313
DocumentMapper documentMapper = mapperService.documentMapper("doc");
300314
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
301-
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
315+
IndexMetaData build = IndexMetaData.builder("")
316+
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
317+
.numberOfShards(1).numberOfReplicas(0).build();
318+
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
319+
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
302320
mapperService.documentMapperParser(), documentMapper, null, null);
303321
fieldMapper.processQuery(phraseQuery, parseContext);
304322
ParseContext.Document document = parseContext.doc();

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.index.mapper.ParseContext;
7373
import org.elasticsearch.index.mapper.ParsedDocument;
7474
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
75+
import org.elasticsearch.index.mapper.SourceFieldMapper;
7576
import org.elasticsearch.index.merge.MergeStats;
7677
import org.elasticsearch.index.merge.OnGoingMerge;
7778
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@@ -2013,7 +2014,8 @@ private IndexWriterConfig getIndexWriterConfig() {
20132014
MergePolicy mergePolicy = config().getMergePolicy();
20142015
if (softDeleteEnabled) {
20152016
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
2016-
mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy);
2017+
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, this::softDeletesRetentionQuery,
2018+
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy));
20172019
}
20182020
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
20192021
iwc.setSimilarity(engineConfig.getSimilarity());

server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.index.mapper.IdFieldMapper;
4141
import org.elasticsearch.index.mapper.MapperService;
4242
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
43+
import org.elasticsearch.index.mapper.SourceFieldMapper;
4344
import org.elasticsearch.index.mapper.Uid;
4445
import org.elasticsearch.index.mapper.VersionFieldMapper;
4546
import org.elasticsearch.index.translog.Translog;
@@ -196,7 +197,9 @@ private Translog.Operation readDocAsOp(int docID) throws IOException {
196197
return null;
197198
}
198199
final long version = docValues[leaf.ord].docVersion(segmentDocID);
199-
final FieldsVisitor fields = new FieldsVisitor(true);
200+
final String sourceField = docValues[leaf.ord].hasRecoverySource(segmentDocID) ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
201+
SourceFieldMapper.NAME;
202+
final FieldsVisitor fields = new FieldsVisitor(true, sourceField);
200203
indexSearcher.doc(docID, fields);
201204
fields.postProcess(mapperService);
202205

@@ -218,7 +221,7 @@ private Translog.Operation readDocAsOp(int docID) throws IOException {
218221
// TODO: pass the latest timestamp from engine.
219222
final long autoGeneratedIdTimestamp = -1;
220223
op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL,
221-
source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp);
224+
source == null ? null : source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp);
222225
}
223226
}
224227
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() : "Unexpected operation; " +
@@ -240,6 +243,7 @@ private static final class CombinedDocValues {
240243
private NumericDocValues seqNoDV;
241244
private NumericDocValues primaryTermDV;
242245
private NumericDocValues tombstoneDV;
246+
private NumericDocValues recoverySource;
243247

244248
CombinedDocValues(LeafReader leafReader) throws IOException {
245249
this.leafReader = leafReader;
@@ -248,6 +252,7 @@ private static final class CombinedDocValues {
248252
this.primaryTermDV = Objects.requireNonNull(
249253
leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
250254
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
255+
this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
251256
}
252257

253258
long docVersion(int segmentDocId) throws IOException {
@@ -293,5 +298,15 @@ boolean isTombstone(int segmentDocId) throws IOException {
293298
}
294299
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
295300
}
301+
302+
boolean hasRecoverySource(int segmentDocId) throws IOException {
303+
if (recoverySource == null) {
304+
return false;
305+
}
306+
if (recoverySource.docID() > segmentDocId) {
307+
recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
308+
}
309+
return recoverySource.advanceExact(segmentDocId);
310+
}
296311
}
297312
}

0 commit comments

Comments
 (0)