Skip to content

Commit a0becd2

Browse files
authored
Optimize indexing for the autogenerated ID append-only case (#20211)
If elasticsearch controls the ID values as well as the documents version we can optimize the code that adds / appends the documents to the index. Essentially we an skip the version lookup for all documents unless the same document is delivered more than once. On the lucene level we can simply call IndexWriter#addDocument instead of #updateDocument but on the Engine level we need to ensure that we deoptimize the case once we see the same document more than once. This is done as follows: 1. Mark every request with a timestamp. This is done once on the first node that receives a request and is fixed for this request. This can be even the machine local time (see why later). The important part is that retry requests will have the same value as the original one. 2. In the engine we make sure we keep the highest seen time stamp of "retry" requests. This is updated while the retry request has its doc id lock. Call this `maxUnsafeAutoIdTimestamp` 3. When the engine runs an "optimized" request comes, it compares it's timestamp with the current `maxUnsafeAutoIdTimestamp` (but doesn't update it). If the the request timestamp is higher it is safe to execute it as optimized (no retry request with the same timestamp has been run before). If not we fall back to "non-optimzed" mode and run the request as a retry one and update the `maxUnsafeAutoIdTimestamp` unless it's been updated already to a higher value Relates to #19813
1 parent 419627c commit a0becd2

File tree

34 files changed

+783
-160
lines changed

34 files changed

+783
-160
lines changed

buildSrc/src/main/resources/checkstyle_suppressions.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,6 @@
386386
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]MapperService.java" checks="LineLength" />
387387
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]Mapping.java" checks="LineLength" />
388388
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]MetadataFieldMapper.java" checks="LineLength" />
389-
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]ParsedDocument.java" checks="LineLength" />
390389
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]CompletionFieldMapper.java" checks="LineLength" />
391390
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]LegacyDateFieldMapper.java" checks="LineLength" />
392391
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]LegacyDoubleFieldMapper.java" checks="LineLength" />
@@ -865,7 +864,6 @@
865864
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]CorruptedFileIT.java" checks="LineLength" />
866865
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]CorruptedTranslogIT.java" checks="LineLength" />
867866
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]DirectoryUtilsTests.java" checks="LineLength" />
868-
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]ExceptionRetryIT.java" checks="LineLength" />
869867
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]IndexStoreTests.java" checks="LineLength" />
870868
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]StoreTests.java" checks="LineLength" />
871869
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]suggest[/\\]stats[/\\]SuggestStatsIT.java" checks="LineLength" />

core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.bulk;
2121

2222
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
23+
import org.elasticsearch.action.support.replication.ReplicationRequest;
2324
import org.elasticsearch.common.io.stream.StreamInput;
2425
import org.elasticsearch.common.io.stream.StreamOutput;
2526
import org.elasticsearch.index.shard.ShardId;
@@ -101,4 +102,15 @@ public String toString() {
101102
}
102103
return b.toString();
103104
}
105+
106+
@Override
107+
public void onRetry() {
108+
for (BulkItemRequest item : items) {
109+
if (item.request() instanceof ReplicationRequest) {
110+
// all replication requests need to be notified here as well to ie. make sure that internal optimizations are
111+
// disabled see IndexRequest#canHaveDuplicates()
112+
((ReplicationRequest) item.request()).onRetry();
113+
}
114+
}
115+
}
104116
}

core/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
7272
/**
7373
* Operation type controls if the type of the index operation.
7474
*/
75-
public static enum OpType {
75+
public enum OpType {
7676
/**
7777
* Index the source. If there an existing document with the id, it will
7878
* be replaced.
@@ -152,6 +152,17 @@ public static OpType fromString(String sOpType) {
152152

153153
private String pipeline;
154154

155+
/**
156+
* Value for {@link #getAutoGeneratedTimestamp()} if the document has an external
157+
* provided ID.
158+
*/
159+
public static final int UNSET_AUTO_GENERATED_TIMESTAMP = -1;
160+
161+
private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;
162+
163+
private boolean isRetry = false;
164+
165+
155166
public IndexRequest() {
156167
}
157168

@@ -202,6 +213,10 @@ public ActionRequestValidationException validate() {
202213
}
203214
}
204215

216+
if (opType() != OpType.INDEX && id == null) {
217+
addValidationError("an id is required for a " + opType() + " operation", validationException);
218+
}
219+
205220
if (!versionType.validateVersionForWrites(version)) {
206221
validationException = addValidationError("illegal version value [" + version + "] for version type [" + versionType.name() + "]", validationException);
207222
}
@@ -216,6 +231,11 @@ public ActionRequestValidationException validate() {
216231
validationException = addValidationError("id is too long, must be no longer than 512 bytes but was: " +
217232
id.getBytes(StandardCharsets.UTF_8).length, validationException);
218233
}
234+
235+
if (id == null && (versionType == VersionType.INTERNAL && version == Versions.MATCH_ANY) == false) {
236+
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
237+
}
238+
219239
return validationException;
220240
}
221241

@@ -589,10 +609,10 @@ public void process(@Nullable MappingMetaData mappingMd, boolean allowIdGenerati
589609
}
590610

591611
// generate id if not already provided and id generation is allowed
592-
if (allowIdGeneration) {
593-
if (id == null) {
594-
id(UUIDs.base64UUID());
595-
}
612+
if (allowIdGeneration && id == null) {
613+
assert autoGeneratedTimestamp == -1;
614+
autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
615+
id(UUIDs.base64UUID());
596616
}
597617

598618
// generate timestamp if not provided, we always have one post this stage...
@@ -639,6 +659,8 @@ public void readFrom(StreamInput in) throws IOException {
639659
version = in.readLong();
640660
versionType = VersionType.fromValue(in.readByte());
641661
pipeline = in.readOptionalString();
662+
isRetry = in.readBoolean();
663+
autoGeneratedTimestamp = in.readLong();
642664
}
643665

644666
@Override
@@ -655,6 +677,8 @@ public void writeTo(StreamOutput out) throws IOException {
655677
out.writeLong(version);
656678
out.writeByte(versionType.getValue());
657679
out.writeOptionalString(pipeline);
680+
out.writeBoolean(isRetry);
681+
out.writeLong(autoGeneratedTimestamp);
658682
}
659683

660684
@Override
@@ -667,4 +691,25 @@ public String toString() {
667691
}
668692
return "index {[" + index + "][" + type + "][" + id + "], source[" + sSource + "]}";
669693
}
694+
695+
696+
/**
697+
* Returns <code>true</code> if this request has been sent to a shard copy more than once.
698+
*/
699+
public boolean isRetry() {
700+
return isRetry;
701+
}
702+
703+
@Override
704+
public void onRetry() {
705+
isRetry = true;
706+
}
707+
708+
/**
709+
* Returns the timestamp the auto generated ID was created or {@value #UNSET_AUTO_GENERATED_TIMESTAMP} if the
710+
* document has no auto generated timestamp. This method will return a positive value iff the id was auto generated.
711+
*/
712+
public long getAutoGeneratedTimestamp() {
713+
return autoGeneratedTimestamp;
714+
}
670715
}

core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -205,15 +205,6 @@ public IndexRequestBuilder setOpType(IndexRequest.OpType opType) {
205205
return this;
206206
}
207207

208-
/**
209-
* Sets a string representation of the {@link #setOpType(org.elasticsearch.action.index.IndexRequest.OpType)}. Can
210-
* be either "index" or "create".
211-
*/
212-
public IndexRequestBuilder setOpType(String opType) {
213-
request.opType(IndexRequest.OpType.fromString(opType));
214-
return this;
215-
}
216-
217208
/**
218209
* Set to <tt>true</tt> to force this index to use {@link org.elasticsearch.action.index.IndexRequest.OpType#CREATE}.
219210
*/

core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public static Engine.Index executeIndexRequestOnReplica(IndexRequest request, In
158158
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
159159
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
160160

161-
final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType());
161+
final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
162162
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
163163
if (update != null) {
164164
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
@@ -171,7 +171,7 @@ public static Engine.Index executeIndexRequestOnReplica(IndexRequest request, In
171171
public static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
172172
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
173173
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
174-
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType());
174+
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
175175
}
176176

177177
public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,

core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,4 +248,12 @@ public String toString() {
248248
public String getDescription() {
249249
return toString();
250250
}
251+
252+
/**
253+
* This method is called before this replication request is retried
254+
* the first time.
255+
*/
256+
public void onRetry() {
257+
// nothing by default
258+
}
251259
}

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,7 @@ void retry(Exception failure) {
722722
return;
723723
}
724724
setPhase(task, "waiting_for_retry");
725+
request.onRetry();
725726
final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
726727
observer.waitForNextChange(new ClusterStateObserver.Listener() {
727728
@Override

core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
140140
PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,
141141
FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
142142
EngineConfig.INDEX_CODEC_SETTING,
143+
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
143144
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
144145
// validate that built-in similarities don't get redefined
145146
Setting.groupSetting("index.similarity.", (s) -> {

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,6 @@
8686
import java.util.concurrent.locks.ReentrantReadWriteLock;
8787
import java.util.function.Function;
8888

89-
/**
90-
*
91-
*/
9289
public abstract class Engine implements Closeable {
9390

9491
public static final String SYNC_COMMIT_ID = "sync_id";
@@ -856,19 +853,24 @@ public long endTime() {
856853
public static class Index extends Operation {
857854

858855
private final ParsedDocument doc;
856+
private final long autoGeneratedIdTimestamp;
857+
private final boolean isRetry;
859858
private boolean created;
860859

861-
public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
860+
public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime,
861+
long autoGeneratedIdTimestamp, boolean isRetry) {
862862
super(uid, version, versionType, origin, startTime);
863863
this.doc = doc;
864+
this.isRetry = isRetry;
865+
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
864866
}
865867

866868
public Index(Term uid, ParsedDocument doc) {
867869
this(uid, doc, Versions.MATCH_ANY);
868-
}
870+
} // TEST ONLY
869871

870-
public Index(Term uid, ParsedDocument doc, long version) {
871-
this(uid, doc, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
872+
Index(Term uid, ParsedDocument doc, long version) {
873+
this(uid, doc, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), -1, false);
872874
}
873875

874876
public ParsedDocument parsedDoc() {
@@ -928,6 +930,23 @@ protected int estimatedSizeInBytes() {
928930
return (id().length() + type().length()) * 2 + source().length() + 12;
929931
}
930932

933+
/**
934+
* Returns a positive timestamp if the ID of this document is auto-generated by elasticsearch.
935+
* if this property is non-negative indexing code might optimize the addition of this document
936+
* due to it's append only nature.
937+
*/
938+
public long getAutoGeneratedIdTimestamp() {
939+
return autoGeneratedIdTimestamp;
940+
}
941+
942+
/**
943+
* Returns <code>true</code> if this index requests has been retried on the coordinating node and can therefor be delivered
944+
* multiple times. Note: this might also be set to true if an equivalent event occurred like the replay of the transaction log
945+
*/
946+
public boolean isRetry() {
947+
return isRetry;
948+
}
949+
931950
}
932951

933952
public static class Delete extends Operation {

core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,17 @@ public final class EngineConfig {
8989
}
9090
}, Property.IndexScope, Property.NodeScope);
9191

92-
private TranslogConfig translogConfig;
92+
/**
93+
* Configures an index to optimize documents with auto generated ids for append only. If this setting is updated from <code>false</code>
94+
* to <code>true</code> might not take effect immediately. In other words, disabling the optimiation will be immediately applied while
95+
* re-enabling it might not be applied until the engine is in a safe state to do so. Depending on the engine implementation a change to
96+
* this setting won't be reflected re-enabled optimization until the engine is restarted or the index is closed and reopened.
97+
* The default is <code>true</code>
98+
*/
99+
public static final Setting<Boolean> INDEX_OPTIMIZE_AUTO_GENERATED_IDS = Setting.boolSetting("index.optimize_auto_generated_id", true,
100+
Property.IndexScope, Property.Dynamic);
101+
102+
private final TranslogConfig translogConfig;
93103
private final OpenMode openMode;
94104

95105
/**
@@ -311,4 +321,12 @@ public enum OpenMode {
311321
public RefreshListeners getRefreshListeners() {
312322
return refreshListeners;
313323
}
324+
325+
/**
326+
* Returns <code>true</code> iff auto generated IDs should be optimized inside the engine for append only.
327+
* The default is <code>true</code>.
328+
*/
329+
public boolean getOptimizeAutoGeneratedIds() {
330+
return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS);
331+
}
314332
}

0 commit comments

Comments
 (0)