From 74f6192ed278e56a1bb348f57f187fa30af19732 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 18 Apr 2017 09:02:03 +0200 Subject: [PATCH] Engine: store maxUnsafeAutoIdTimestamp in commit The `maxUnsafeAutoIdTimestamp` time stamp is a safety marker guaranteeing that no indexing operation with a higher auto gen id timestamp was process by the engine. This allows us to safely process documents without checking if they were seen before. Currently this property is maintained in memory and is handed off from the primary to any replica during the recovery process. This PR takes a more natural approach and stores it in the lucene commit, using the same semantics (no op with a higher time stamp is part of this commit). This means that the knowledge is transferred during the file copy and also means that we don't need to worry about crazy situations where an original append only request arrives at the engine after a retry was processed *and* the engine was restarted. --- .../index/engine/InternalEngine.java | 16 +++++++- .../index/engine/InternalEngineTests.java | 39 ++++++++++++++++--- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5e5b2ed3fecd5..0bed51e0e24a1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -128,6 +128,7 @@ public class InternalEngine extends Engine { private final AtomicInteger throttleRequestCount = new AtomicInteger(); private final EngineConfig.OpenMode openMode; private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); + private static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); @@ -178,6 +179,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { } logger.trace("recovered [{}]", seqNoStats); seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); + updateMaxUnsafeAutoIdTimestampFromWriter(writer); // norelease /* * We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means @@ -226,6 +228,17 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { logger.trace("created new InternalEngine"); } + private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { + long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE; + for (Map.Entry entry : writer.getLiveCommitData()) { + if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { + commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue()); + break; + } + } + maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp)); + } + private static SequenceNumbersService sequenceNumberService( final ShardId shardId, final IndexSettings indexSettings, @@ -500,7 +513,7 @@ private boolean canOptimizeAddDocument(Index index) { return true; case LOCAL_TRANSLOG_RECOVERY: assert index.isRetry(); - return false; // even if retry is set we never optimize local recovery + return true; // allow to optimize in order to update the max safe time stamp default: throw new IllegalArgumentException("unknown origin " + index.origin()); } @@ -1770,6 +1783,7 @@ private long commitIndexWriter(final IndexWriter writer, final Translog translog commitData.put(Engine.SYNC_COMMIT_ID, syncId); } commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); + commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 30d22dfb731c2..26e3676e870a2 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3220,11 +3220,40 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { } - long maxTimestamp = Math.abs(randomLong()); - try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - maxTimestamp, null))) { - assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + final long timestamp1 = Math.abs(randomLong()); + final Path storeDir = createTempDir(); + final Path translogDir = createTempDir(); + try (Store store = createStore(newFSDirectory(storeDir)); + Engine engine = new InternalEngine( + config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp1, null))) { + assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + } + final long timestamp2 = randomNonNegativeLong(); + final long timestamp3 = randomNonNegativeLong(); + final long maxTimestamp12 = Math.max(timestamp1, timestamp2); + final long maxTimestamp123 = Math.max(maxTimestamp12, timestamp3); + try (Store store = createStore(newFSDirectory(storeDir)); + Engine engine = new InternalEngine( + copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp2, null), + randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) { + assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + // recover from translog and commit maxTimestamp12 + engine.recoverFromTranslog(); + // force flush as the were no ops performed + engine.flush(true, false); + } + final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), + new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + engine.index(appendOnlyPrimary(doc, true, timestamp3)); + assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + } + try (Store store = createStore(newFSDirectory(storeDir)); + Engine engine = new InternalEngine( + config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + engine.recoverFromTranslog(); + assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } }