Skip to content

Commit 1b7c4c3

Browse files
committed
Allow _update and upsert to read from the transaction log (#29264)
We historically removed reading from the transaction log to get consistent results from _GET calls. There was also the motivation that the read-modify-update principle we apply should not be hidden from the user. We still agree on the fact that we should not hide these aspects but the impact on updates is quite significant especially if the same documents is updated before it's written to disk and made serachable. This change adds back the ability to read from the transaction log but only for update calls. Calls to the _GET API will always do a refresh if necessary to return consistent results ie. if stored fields or DocValues Fields are requested. Closes #26802
1 parent 5c8315a commit 1b7c4c3

File tree

21 files changed

+601
-45
lines changed

21 files changed

+601
-45
lines changed

server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,13 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId
9797
if (uidTerm == null) {
9898
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false);
9999
}
100-
result = context.indexShard().get(new Engine.Get(false, request.type(), request.id(), uidTerm));
100+
result = context.indexShard().get(new Engine.Get(false, false, request.type(), request.id(), uidTerm));
101101
if (!result.exists()) {
102102
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false);
103103
}
104104
context.parsedQuery(context.getQueryShardContext().toQuery(request.query()));
105105
context.preProcess(true);
106-
int topLevelDocId = result.docIdAndVersion().docId + result.docIdAndVersion().context.docBase;
106+
int topLevelDocId = result.docIdAndVersion().docId + result.docIdAndVersion().docBase;
107107
Explanation explanation = context.searcher().explain(context.query(), topLevelDocId);
108108
for (RescoreContext ctx : context.rescore()) {
109109
Rescorer rescorer = ctx.rescorer();

server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.elasticsearch.script.ExecutableScript;
4848
import org.elasticsearch.script.Script;
4949
import org.elasticsearch.script.ScriptService;
50-
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
5150
import org.elasticsearch.search.lookup.SourceLookup;
5251

5352
import java.io.IOException;
@@ -71,9 +70,8 @@ public UpdateHelper(Settings settings, ScriptService scriptService) {
7170
* Prepares an update request by converting it into an index or delete request or an update response (no action).
7271
*/
7372
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
74-
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
75-
new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME},
76-
true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE);
73+
final GetResult getResult = indexShard.getService().getForUpdate(request.type(), request.id(), request.version(),
74+
request.versionType());
7775
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
7876
}
7977

server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
100100
if (versions.advanceExact(docID) == false) {
101101
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
102102
}
103-
return new DocIdAndVersion(docID, versions.longValue(), context);
103+
return new DocIdAndVersion(docID, versions.longValue(), context.reader(), context.docBase);
104104
} else {
105105
return null;
106106
}

server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.common.lucene.uid;
2121

2222
import org.apache.lucene.index.IndexReader;
23+
import org.apache.lucene.index.LeafReader;
2324
import org.apache.lucene.index.LeafReaderContext;
2425
import org.apache.lucene.index.NumericDocValues;
2526
import org.apache.lucene.index.Term;
@@ -97,12 +98,14 @@ private VersionsAndSeqNoResolver() {
9798
public static class DocIdAndVersion {
9899
public final int docId;
99100
public final long version;
100-
public final LeafReaderContext context;
101+
public final LeafReader reader;
102+
public final int docBase;
101103

102-
DocIdAndVersion(int docId, long version, LeafReaderContext context) {
104+
public DocIdAndVersion(int docId, long version, LeafReader reader, int docBase) {
103105
this.docId = docId;
104106
this.version = version;
105-
this.context = context;
107+
this.reader = reader;
108+
this.docBase = docBase;
106109
}
107110
}
108111

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1232,14 +1232,16 @@ public static class Get {
12321232
private final boolean realtime;
12331233
private final Term uid;
12341234
private final String type, id;
1235+
private final boolean readFromTranslog;
12351236
private long version = Versions.MATCH_ANY;
12361237
private VersionType versionType = VersionType.INTERNAL;
12371238

1238-
public Get(boolean realtime, String type, String id, Term uid) {
1239+
public Get(boolean realtime, boolean readFromTranslog, String type, String id, Term uid) {
12391240
this.realtime = realtime;
12401241
this.type = type;
12411242
this.id = id;
12421243
this.uid = uid;
1244+
this.readFromTranslog = readFromTranslog;
12431245
}
12441246

12451247
public boolean realtime() {
@@ -1275,6 +1277,10 @@ public Get versionType(VersionType versionType) {
12751277
this.versionType = versionType;
12761278
return this;
12771279
}
1280+
1281+
public boolean isReadFromTranslog() {
1282+
return readFromTranslog;
1283+
}
12781284
}
12791285

12801286
public static class GetResult implements Releasable {

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public class InternalEngine extends Engine {
146146
* being indexed/deleted.
147147
*/
148148
private final AtomicLong writingBytes = new AtomicLong();
149+
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
149150

150151
@Nullable
151152
private final String historyUUID;
@@ -663,6 +664,27 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
663664
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
664665
get.versionType().explainConflictForReads(versionValue.version, get.version()));
665666
}
667+
if (get.isReadFromTranslog()) {
668+
// this is only used for updates - API _GET calls will always read form a reader for consistency
669+
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
670+
if (versionValue.getLocation() != null) {
671+
try {
672+
Translog.Operation operation = translog.readOperation(versionValue.getLocation());
673+
if (operation != null) {
674+
// in the case of a already pruned translog generation we might get null here - yet very unlikely
675+
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
676+
.getIndexSettings().getIndexVersionCreated());
677+
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)),
678+
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
679+
}
680+
} catch (IOException e) {
681+
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
682+
throw new EngineException(shardId, "failed to read operation from translog", e);
683+
}
684+
} else {
685+
trackTranslogLocation.set(true);
686+
}
687+
}
666688
refresh("realtime_get", SearcherScope.INTERNAL);
667689
}
668690
scope = SearcherScope.INTERNAL;
@@ -921,6 +943,10 @@ public IndexResult index(Index index) throws IOException {
921943
}
922944
indexResult.setTranslogLocation(location);
923945
}
946+
if (plan.indexIntoLucene && indexResult.hasFailure() == false) {
947+
versionMap.maybePutUnderLock(index.uid().bytes(),
948+
getVersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm(), indexResult.getTranslogLocation()));
949+
}
924950
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
925951
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
926952
}
@@ -1040,8 +1066,6 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
10401066
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
10411067
index(index.docs(), indexWriter);
10421068
}
1043-
versionMap.maybePutUnderLock(index.uid().bytes(),
1044-
new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
10451069
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
10461070
} catch (Exception ex) {
10471071
if (indexWriter.getTragicException() == null) {
@@ -1065,6 +1089,13 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
10651089
}
10661090
}
10671091

1092+
private VersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) {
1093+
if (location != null && trackTranslogLocation.get()) {
1094+
return new TranslogVersionValue(location, version, seqNo, term);
1095+
}
1096+
return new VersionValue(version, seqNo, term);
1097+
}
1098+
10681099
/**
10691100
* returns true if the indexing operation may have already be processed by this engine.
10701101
* Note that it is OK to rarely return true even if this is not the case. However a `false`

0 commit comments

Comments
 (0)