Skip to content

Commit badb2be

Browse files
authored
Peer Recovery: remove maxUnsafeAutoIdTimestamp hand off (#24243)
With #24149 , it is now stored in the Lucene commit and is implicitly transferred in the file phase of the recovery.
1 parent 63e5aff commit badb2be

15 files changed

+102
-117
lines changed

core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.lucene.search.ReferenceManager;
2828
import org.apache.lucene.search.Sort;
2929
import org.apache.lucene.search.similarities.Similarity;
30-
import org.elasticsearch.action.index.IndexRequest;
3130
import org.elasticsearch.common.Nullable;
3231
import org.elasticsearch.common.settings.Setting;
3332
import org.elasticsearch.common.settings.Setting.Property;
@@ -67,7 +66,6 @@ public final class EngineConfig {
6766
private final Engine.EventListener eventListener;
6867
private final QueryCache queryCache;
6968
private final QueryCachingPolicy queryCachingPolicy;
70-
private final long maxUnsafeAutoIdTimestamp;
7169
@Nullable
7270
private final ReferenceManager.RefreshListener refreshListeners;
7371
@Nullable
@@ -116,7 +114,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
116114
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
117115
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
118116
TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners,
119-
long maxUnsafeAutoIdTimestamp, Sort indexSort) {
117+
Sort indexSort) {
120118
if (openMode == null) {
121119
throw new IllegalArgumentException("openMode must not be null");
122120
}
@@ -143,9 +141,6 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
143141
this.flushMergesAfter = flushMergesAfter;
144142
this.openMode = openMode;
145143
this.refreshListeners = refreshListeners;
146-
assert maxUnsafeAutoIdTimestamp >= IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP :
147-
"maxUnsafeAutoIdTimestamp must be >= -1 but was " + maxUnsafeAutoIdTimestamp;
148-
this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp;
149144
this.indexSort = indexSort;
150145
}
151146

@@ -333,11 +328,10 @@ public ReferenceManager.RefreshListener getRefreshListeners() {
333328
}
334329

335330
/**
336-
* Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
337-
* This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
331+
* returns true if the engine is allowed to optimize indexing operations with an auto-generated ID
338332
*/
339-
public long getMaxUnsafeAutoIdTimestamp() {
340-
return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS) ? maxUnsafeAutoIdTimestamp : Long.MAX_VALUE;
333+
public boolean isAutoGeneratedIDsOptimizationEnabled() {
334+
return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS);
341335
}
342336

343337
/**

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,19 +128,16 @@ public class InternalEngine extends Engine {
128128
private final AtomicInteger throttleRequestCount = new AtomicInteger();
129129
private final EngineConfig.OpenMode openMode;
130130
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
131-
private static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
131+
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
132132
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
133133
private final CounterMetric numVersionLookups = new CounterMetric();
134134
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
135135

136136
public InternalEngine(EngineConfig engineConfig) throws EngineException {
137137
super(engineConfig);
138138
openMode = engineConfig.getOpenMode();
139-
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_5_0_0_beta1)) {
140-
// no optimization for pre 5.0.0.alpha6 since translog might not have all information needed
139+
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
141140
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
142-
} else {
143-
maxUnsafeAutoIdTimestamp.set(engineConfig.getMaxUnsafeAutoIdTimestamp());
144141
}
145142
this.versionMap = new LiveVersionMap();
146143
store.incRef();
@@ -1836,7 +1833,7 @@ public void onSettingsChanged() {
18361833
mergeScheduler.refreshConfig();
18371834
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
18381835
maybePruneDeletedTombstones();
1839-
if (engineConfig.getMaxUnsafeAutoIdTimestamp() == Long.MAX_VALUE) {
1836+
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
18401837
// this is an anti-viral settings you can only opt out for the entire index
18411838
// only if a shard starts up again due to relocation or if the index is closed
18421839
// the setting will be re-interpreted if it's set to true

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.lucene.index.IndexFormatTooOldException;
2828
import org.apache.lucene.index.IndexWriter;
2929
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
30+
import org.apache.lucene.index.SegmentInfos;
3031
import org.apache.lucene.index.SnapshotDeletionPolicy;
3132
import org.apache.lucene.index.Term;
3233
import org.apache.lucene.search.Query;
@@ -38,11 +39,11 @@
3839
import org.apache.lucene.util.IOUtils;
3940
import org.apache.lucene.util.ThreadInterruptedException;
4041
import org.elasticsearch.ElasticsearchException;
42+
import org.elasticsearch.Version;
4143
import org.elasticsearch.action.ActionListener;
4244
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
4345
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
4446
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
45-
import org.elasticsearch.action.index.IndexRequest;
4647
import org.elasticsearch.cluster.metadata.IndexMetaData;
4748
import org.elasticsearch.cluster.metadata.MappingMetaData;
4849
import org.elasticsearch.cluster.routing.RecoverySource;
@@ -79,6 +80,7 @@
7980
import org.elasticsearch.index.engine.EngineConfig;
8081
import org.elasticsearch.index.engine.EngineException;
8182
import org.elasticsearch.index.engine.EngineFactory;
83+
import org.elasticsearch.index.engine.InternalEngine;
8284
import org.elasticsearch.index.engine.InternalEngineFactory;
8385
import org.elasticsearch.index.engine.RefreshFailedEngineException;
8486
import org.elasticsearch.index.engine.Segment;
@@ -1040,11 +1042,11 @@ public void performTranslogRecovery(boolean indexExists) throws IOException {
10401042
translogStats.totalOperations(0);
10411043
translogStats.totalOperationsOnStart(0);
10421044
}
1043-
internalPerformTranslogRecovery(false, indexExists, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
1045+
internalPerformTranslogRecovery(false, indexExists);
10441046
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
10451047
}
10461048

1047-
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException {
1049+
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException {
10481050
if (state != IndexShardState.RECOVERING) {
10491051
throw new IndexShardNotRecoveringException(shardId, state);
10501052
}
@@ -1073,7 +1075,10 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole
10731075
} else {
10741076
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
10751077
}
1076-
final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp);
1078+
1079+
assert indexExists == false || assertMaxUnsafeAutoIdInCommit();
1080+
1081+
final EngineConfig config = newEngineConfig(openMode);
10771082
// we disable deletes since we allow for operations to be executed against the shard while recovering
10781083
// but we need to make sure we don't loose deletes until we are done recovering
10791084
config.setEnableGcDeletes(false);
@@ -1087,6 +1092,22 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole
10871092
}
10881093
}
10891094

1095+
private boolean assertMaxUnsafeAutoIdInCommit() throws IOException {
1096+
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
1097+
if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.PEER) {
1098+
// as of 5.5.0, the engine stores the maxUnsafeAutoIdTimestamp in the commit point.
1099+
// This should have baked into the commit by the primary we recover from, regardless of the index age.
1100+
assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) :
1101+
"recovery from remote but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit";
1102+
} else if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE &&
1103+
indexSettings.getIndexVersionCreated().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
1104+
assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) :
1105+
"opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID
1106+
+ " is not found in commit";
1107+
}
1108+
return true;
1109+
}
1110+
10901111
protected void onNewEngine(Engine newEngine) {
10911112
refreshListeners.setTranslog(newEngine.getTranslog());
10921113
}
@@ -1096,9 +1117,9 @@ protected void onNewEngine(Engine newEngine) {
10961117
* the replay of the transaction log which is required in cases where we restore a previous index or recover from
10971118
* a remote peer.
10981119
*/
1099-
public void skipTranslogRecovery(long maxUnsafeAutoIdTimestamp) throws IOException {
1120+
public void skipTranslogRecovery() throws IOException {
11001121
assert getEngineOrNull() == null : "engine was already created";
1101-
internalPerformTranslogRecovery(true, true, maxUnsafeAutoIdTimestamp);
1122+
internalPerformTranslogRecovery(true, true);
11021123
assert recoveryState.getTranslog().recoveredOperations() == 0;
11031124
}
11041125

@@ -1795,14 +1816,13 @@ private DocumentMapperForType docMapper(String type) {
17951816
return mapperService.documentMapperWithAutoCreate(type);
17961817
}
17971818

1798-
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, long maxUnsafeAutoIdTimestamp) {
1819+
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
17991820
final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
18001821
Sort indexSort = indexSortSupplier.get();
18011822
return new EngineConfig(openMode, shardId,
18021823
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
18031824
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
1804-
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners,
1805-
maxUnsafeAutoIdTimestamp, indexSort);
1825+
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort);
18061826
}
18071827

18081828
/**

core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.lucene.store.IOContext;
3232
import org.apache.lucene.store.IndexInput;
3333
import org.elasticsearch.ExceptionsHelper;
34-
import org.elasticsearch.action.index.IndexRequest;
3534
import org.elasticsearch.cluster.metadata.IndexMetaData;
3635
import org.elasticsearch.cluster.metadata.MappingMetaData;
3736
import org.elasticsearch.cluster.routing.RecoverySource;
@@ -353,7 +352,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
353352
recoveryState.getIndex().updateVersion(version);
354353
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
355354
assert indexShouldExists;
356-
indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
355+
indexShard.skipTranslogRecovery();
357356
} else {
358357
// since we recover from local, just fill the files and size
359358
try {
@@ -405,7 +404,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
405404
}
406405
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
407406
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
408-
indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
407+
indexShard.skipTranslogRecovery();
409408
indexShard.finalizeRecovery();
410409
indexShard.postRecovery("restore done");
411410
} catch (Exception e) {

core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
377377
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
378378
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
379379
)) {
380-
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps(), request.getMaxUnsafeAutoIdTimestamp());
380+
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps());
381381
}
382382
channel.sendResponse(TransportResponse.Empty.INSTANCE);
383383
}

core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.indices.recovery;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.index.IndexRequest;
2324
import org.elasticsearch.common.io.stream.StreamInput;
2425
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -29,19 +30,17 @@
2930

3031
public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
3132

32-
private long maxUnsafeAutoIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
3333
private long recoveryId;
3434
private ShardId shardId;
3535
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
3636

3737
public RecoveryPrepareForTranslogOperationsRequest() {
3838
}
3939

40-
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, long maxUnsafeAutoIdTimestamp) {
40+
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
4141
this.recoveryId = recoveryId;
4242
this.shardId = shardId;
4343
this.totalTranslogOps = totalTranslogOps;
44-
this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp;
4544
}
4645

4746
public long recoveryId() {
@@ -56,17 +55,15 @@ public int totalTranslogOps() {
5655
return totalTranslogOps;
5756
}
5857

59-
public long getMaxUnsafeAutoIdTimestamp() {
60-
return maxUnsafeAutoIdTimestamp;
61-
}
62-
6358
@Override
6459
public void readFrom(StreamInput in) throws IOException {
6560
super.readFrom(in);
6661
recoveryId = in.readLong();
6762
shardId = ShardId.readShardId(in);
6863
totalTranslogOps = in.readVInt();
69-
maxUnsafeAutoIdTimestamp = in.readLong();
64+
if (in.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
65+
in.readLong(); // maxUnsafeAutoIdTimestamp
66+
}
7067
}
7168

7269
@Override
@@ -75,6 +72,8 @@ public void writeTo(StreamOutput out) throws IOException {
7572
out.writeLong(recoveryId);
7673
shardId.writeTo(out);
7774
out.writeVInt(totalTranslogOps);
78-
out.writeLong(maxUnsafeAutoIdTimestamp);
75+
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
76+
out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp
77+
}
7978
}
8079
}

core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
157157
}
158158

159159
try {
160-
prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());
160+
prepareTargetForTranslog(translogView.totalOperations());
161161
} catch (final Exception e) {
162162
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
163163
}
@@ -389,13 +389,13 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView)
389389
}
390390
}
391391

392-
void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException {
392+
void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
393393
StopWatch stopWatch = new StopWatch().start();
394394
logger.trace("recovery [phase1]: prepare remote engine for translog");
395395
final long startEngineStart = stopWatch.totalTime().millis();
396396
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
397397
// garbage collection (not the JVM's GC!) of tombstone deletes.
398-
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp));
398+
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
399399
stopWatch.stop();
400400

401401
response.startTime = stopWatch.totalTime().millis() - startEngineStart;

core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.common.bytes.BytesReference;
3737
import org.elasticsearch.common.logging.Loggers;
3838
import org.elasticsearch.common.lucene.Lucene;
39-
import org.elasticsearch.common.unit.TimeValue;
4039
import org.elasticsearch.common.util.Callback;
4140
import org.elasticsearch.common.util.CancellableThreads;
4241
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
@@ -49,7 +48,6 @@
4948
import org.elasticsearch.index.translog.Translog;
5049

5150
import java.io.IOException;
52-
import java.util.ArrayList;
5351
import java.util.Arrays;
5452
import java.util.Collections;
5553
import java.util.Iterator;
@@ -58,8 +56,6 @@
5856
import java.util.Map.Entry;
5957
import java.util.concurrent.ConcurrentMap;
6058
import java.util.concurrent.CountDownLatch;
61-
import java.util.concurrent.TimeUnit;
62-
import java.util.concurrent.TimeoutException;
6359
import java.util.concurrent.atomic.AtomicBoolean;
6460
import java.util.concurrent.atomic.AtomicLong;
6561

@@ -360,9 +356,9 @@ private void ensureRefCount() {
360356
/*** Implementation of {@link RecoveryTargetHandler } */
361357

362358
@Override
363-
public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
359+
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
364360
state().getTranslog().totalOperations(totalTranslogOps);
365-
indexShard().skipTranslogRecovery(maxUnsafeAutoIdTimestamp);
361+
indexShard().skipTranslogRecovery();
366362
}
367363

368364
@Override

core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,8 @@ public interface RecoveryTargetHandler {
3333
* Prepares the target to receive translog operations, after all file have been copied
3434
*
3535
* @param totalTranslogOps total translog operations expected to be sent
36-
* @param maxUnsafeAutoIdTimestamp the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
37-
* This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
3836
*/
39-
void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException;
37+
void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
4038

4139
/**
4240
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and

0 commit comments

Comments
 (0)