Skip to content

Commit edff30f

Browse files
authored
Engine: store maxUnsafeAutoIdTimestamp in commit (#24149)
The `maxUnsafeAutoIdTimestamp` timestamp is a safety marker guaranteeing that no retried-indexing operation with a higher auto gen id timestamp was process by the engine. This allows us to safely process documents without checking if they were seen before. Currently this property is maintained in memory and is handed off from the primary to any replica during the recovery process. This commit takes a more natural approach and stores it in the lucene commit, using the same semantics (no retry op with a higher time stamp is part of this commit). This means that the knowledge is transferred during the file copy and also means that we don't need to worry about crazy situations where an original append only request arrives at the engine after a retry was processed *and* the engine was restarted.
1 parent ab9884b commit edff30f

File tree

2 files changed

+49
-6
lines changed

2 files changed

+49
-6
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public class InternalEngine extends Engine {
128128
private final AtomicInteger throttleRequestCount = new AtomicInteger();
129129
private final EngineConfig.OpenMode openMode;
130130
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
131+
private static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
131132
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
132133
private final CounterMetric numVersionLookups = new CounterMetric();
133134
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
@@ -178,6 +179,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
178179
}
179180
logger.trace("recovered [{}]", seqNoStats);
180181
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
182+
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
181183
// norelease
182184
/*
183185
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
@@ -226,6 +228,17 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
226228
logger.trace("created new InternalEngine");
227229
}
228230

231+
private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
232+
long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
233+
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
234+
if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
235+
commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue());
236+
break;
237+
}
238+
}
239+
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
240+
}
241+
229242
private static SequenceNumbersService sequenceNumberService(
230243
final ShardId shardId,
231244
final IndexSettings indexSettings,
@@ -500,7 +513,7 @@ private boolean canOptimizeAddDocument(Index index) {
500513
return true;
501514
case LOCAL_TRANSLOG_RECOVERY:
502515
assert index.isRetry();
503-
return false; // even if retry is set we never optimize local recovery
516+
return true; // allow to optimize in order to update the max safe time stamp
504517
default:
505518
throw new IllegalArgumentException("unknown origin " + index.origin());
506519
}
@@ -1770,6 +1783,7 @@ private long commitIndexWriter(final IndexWriter writer, final Translog translog
17701783
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
17711784
}
17721785
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
1786+
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
17731787
logger.trace("committing writer with commit data [{}]", commitData);
17741788
return commitData.entrySet().iterator();
17751789
});

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3223,11 +3223,40 @@ public void testEngineMaxTimestampIsInitialized() throws IOException {
32233223

32243224
}
32253225

3226-
long maxTimestamp = Math.abs(randomLong());
3227-
try (Store store = createStore();
3228-
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
3229-
maxTimestamp, null))) {
3230-
assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
3226+
final long timestamp1 = Math.abs(randomLong());
3227+
final Path storeDir = createTempDir();
3228+
final Path translogDir = createTempDir();
3229+
try (Store store = createStore(newFSDirectory(storeDir));
3230+
Engine engine = new InternalEngine(
3231+
config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp1, null))) {
3232+
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
3233+
}
3234+
final long timestamp2 = randomNonNegativeLong();
3235+
final long timestamp3 = randomNonNegativeLong();
3236+
final long maxTimestamp12 = Math.max(timestamp1, timestamp2);
3237+
final long maxTimestamp123 = Math.max(maxTimestamp12, timestamp3);
3238+
try (Store store = createStore(newFSDirectory(storeDir));
3239+
Engine engine = new InternalEngine(
3240+
copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp2, null),
3241+
randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) {
3242+
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
3243+
if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
3244+
// recover from translog and commit maxTimestamp12
3245+
engine.recoverFromTranslog();
3246+
// force flush as the were no ops performed
3247+
engine.flush(true, false);
3248+
}
3249+
final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(),
3250+
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
3251+
engine.index(appendOnlyPrimary(doc, true, timestamp3));
3252+
assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
3253+
}
3254+
try (Store store = createStore(newFSDirectory(storeDir));
3255+
Engine engine = new InternalEngine(
3256+
config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
3257+
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
3258+
engine.recoverFromTranslog();
3259+
assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
32313260
}
32323261
}
32333262

0 commit comments

Comments
 (0)