Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherSc
if (docIdAndVersion != null) {
// don't release the searcher on this path, it is the
// responsibility of the caller to call GetResult.release
return new GetResult(searcher, docIdAndVersion);
return new GetResult(searcher, docIdAndVersion, false);
} else {
Releasables.close(searcher);
return GetResult.NOT_EXISTS;
Expand Down Expand Up @@ -1621,21 +1621,20 @@ public static class GetResult implements Releasable {
private final long version;
private final DocIdAndVersion docIdAndVersion;
private final Engine.Searcher searcher;
private final boolean fromTranslog;

public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null);
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null, false);

private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher) {
private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher, boolean fromTranslog) {
this.exists = exists;
this.version = version;
this.docIdAndVersion = docIdAndVersion;
this.searcher = searcher;
this.fromTranslog = fromTranslog;
}

/**
* Build a non-realtime get result from the searcher.
*/
public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion) {
this(true, docIdAndVersion.version, docIdAndVersion, searcher);
public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) {
this(true, docIdAndVersion.version, docIdAndVersion, searcher, fromTranslog);
}

public boolean exists() {
Expand All @@ -1646,6 +1645,10 @@ public long version() {
return this.version;
}

public boolean isFromTranslog() {
return fromTranslog;
}

public Engine.Searcher searcher() {
return this.searcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher>
return new GetResult(new Engine.Searcher("realtime_get", reader,
IndexSearcher.getDefaultSimilarity(), null, IndexSearcher.getDefaultQueryCachingPolicy(), reader),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, index.version(), index.seqNo(), index.primaryTerm(),
reader, 0));
reader, 0), true);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private GetResult innerGet(String id, String[] gFields, boolean realtime, long v
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
Engine.GetResult get = indexShard.get(new Engine.Get(realtime, readFromTranslog, id, uidTerm)
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
assert get.isFromTranslog() == false || readFromTranslog : "should only read from translog if explicitly enabled";
if (get.exists() == false) {
get.close();
}
Expand Down Expand Up @@ -223,12 +224,22 @@ private GetResult innerGetLoadFromStoredFields(String id, String[] gFields, Fetc

if (!fetchSourceContext.fetchSource()) {
source = null;
} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
}

if (source != null && get.isFromTranslog()) {
// reapply source filters from mapping (possibly also nulling the source)
try {
source = docMapper.sourceMapper().applyFilters(source, null);
} catch (IOException e) {
throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
}
}

if (source != null && (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0)) {
Map<String, Object> sourceAsMap;
XContentType sourceContentType = null;
// TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
sourceContentType = typeMapTuple.v1();
XContentType sourceContentType = typeMapTuple.v1();
sourceAsMap = typeMapTuple.v2();
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -227,33 +228,43 @@ public void parse(ParseContext context) throws IOException {
@Override
protected void parseCreateField(ParseContext context, List<IndexableField> fields) throws IOException {
BytesReference originalSource = context.sourceToParse().source();
BytesReference source = originalSource;
if (enabled && fieldType().stored() && source != null) {
XContentType contentType = context.sourceToParse().getXContentType();
final BytesReference adaptedSource = applyFilters(originalSource, contentType);

if (adaptedSource != null) {
final BytesRef ref = adaptedSource.toBytesRef();
fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
}

if (originalSource != null && adaptedSource != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
fields.add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
fields.add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
}
}

@Nullable
public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable XContentType contentType) throws IOException {
if (enabled && fieldType().stored() && originalSource != null) {
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
if (filter != null) {
// we don't update the context source if we filter, we want to keep it as is...
Tuple<XContentType, Map<String, Object>> mapTuple =
XContentHelper.convertToMap(source, true, context.sourceToParse().getXContentType());
XContentHelper.convertToMap(originalSource, true, contentType);
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
BytesStreamOutput bStream = new BytesStreamOutput();
XContentType contentType = mapTuple.v1();
XContentBuilder builder = XContentFactory.contentBuilder(contentType, bStream).map(filteredSource);
XContentType actualContentType = mapTuple.v1();
XContentBuilder builder = XContentFactory.contentBuilder(actualContentType, bStream).map(filteredSource);
builder.close();
source = bStream.bytes();
return bStream.bytes();
} else {
return originalSource;
}
BytesRef ref = source.toBytesRef();
fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
} else {
source = null;
return null;
}

if (originalSource != null && source != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
fields.add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
fields.add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
}
}
}

@Override
protected String contentType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ public void testGetForUpdate() throws IOException {
closeShards(primary);
}

public void testGetFromTranslogWithSourceMappingOptions() throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
String docToIndex = "{\"foo\" : \"foo\", \"bar\" : \"bar\"}";
boolean noSource = randomBoolean();
String sourceOptions = noSource ? "\"enabled\": false" : randomBoolean() ? "\"excludes\": [\"fo*\"]" : "\"includes\": [\"ba*\"]";
String expectedResult = noSource ? "" : "{\"bar\":\"bar\"}";
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}, \"bar\": { \"type\": \"text\"}}, \"_source\": { "
+ sourceOptions + "}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(primary);
Engine.IndexResult test = indexDoc(primary, "test", "0", docToIndex);
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet = primary.getService().getForUpdate("0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals(new String(testGet.source() == null ? new byte[0] : testGet.source(), StandardCharsets.UTF_8), expectedResult);
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(searcher.getIndexReader().maxDoc(), 1); // we refreshed
}

closeShards(primary);
}

public void testTypelessGetForUpdate() throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
Expand Down