From 16dccd6d98eeee11a7baa47b46126be8da13d6e4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 26 Feb 2019 21:58:36 -0500 Subject: [PATCH] Explicitly advance max_seq_no before indexing --- .../index/engine/InternalEngine.java | 28 +++++++---------- .../index/seqno/LocalCheckpointTracker.java | 30 ++++++++++++------- .../recovery/RecoverySourceHandler.java | 1 + .../index/engine/InternalEngineTests.java | 3 +- .../seqno/LocalCheckpointTrackerTests.java | 26 ++++++++++++++++ .../index/translog/TranslogTests.java | 1 + .../ccr/index/engine/FollowingEngine.java | 2 -- .../xpack/ccr/repository/CcrRepository.java | 1 + .../ShardFollowNodeTaskRandomTests.java | 1 + 9 files changed, 61 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 306baf2da0957..9d202077c4718 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -852,7 +852,9 @@ public IndexResult index(Index index) throws IOException { * or calls updateDocument. */ final IndexingStrategy plan = indexingStrategyForOperation(index); - + if (plan.seqNoForIndexing != SequenceNumbers.UNASSIGNED_SEQ_NO) { + localCheckpointTracker.advanceMaxSeqNo(plan.seqNoForIndexing); + } final IndexResult indexResult; if (plan.earlyResultOnPreFlightError.isPresent()) { indexResult = plan.earlyResultOnPreFlightError.get(); @@ -943,7 +945,6 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO } } } - markSeqNoAsSeen(index.seqNo()); return plan; } @@ -1221,7 +1222,9 @@ public DeleteResult delete(Delete delete) throws IOException { ensureOpen(); lastWriteNanos = delete.startTime(); final DeletionStrategy plan = deletionStrategyForOperation(delete); - + if (plan.seqNoOfDeletion != SequenceNumbers.UNASSIGNED_SEQ_NO) { + localCheckpointTracker.advanceMaxSeqNo(plan.seqNoOfDeletion); + } if (plan.earlyResultOnPreflightError.isPresent()) { deleteResult = plan.earlyResultOnPreflightError.get(); } else if (plan.deleteFromLucene || plan.addStaleOpToLucene) { @@ -1297,7 +1300,6 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws delete.seqNo(), delete.version()); } } - markSeqNoAsSeen(delete.seqNo()); return plan; } @@ -1453,7 +1455,6 @@ public NoOpResult noOp(final NoOp noOp) throws IOException { final NoOpResult noOpResult; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - markSeqNoAsSeen(noOp.seqNo()); noOpResult = innerNoOp(noOp); } catch (final Exception e) { try { @@ -1469,8 +1470,8 @@ public NoOpResult noOp(final NoOp noOp) throws IOException { private NoOpResult innerNoOp(final NoOp noOp) throws IOException { assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED; - final long seqNo = noOp.seqNo(); - try (Releasable ignored = noOpKeyedLock.acquire(seqNo)) { + try (Releasable ignored = noOpKeyedLock.acquire(noOp.seqNo())) { + localCheckpointTracker.advanceMaxSeqNo(noOp.seqNo()); final NoOpResult noOpResult; final Optional preFlightError = preFlightCheckForNoOp(noOp); if (preFlightError.isPresent()) { @@ -1508,13 +1509,10 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { noOpResult.setTranslogLocation(location); } } + localCheckpointTracker.markSeqNoAsCompleted(noOp.seqNo()); noOpResult.setTook(System.nanoTime() - noOp.startTime()); noOpResult.freeze(); return noOpResult; - } finally { - if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { - localCheckpointTracker.markSeqNoAsCompleted(seqNo); - } } } @@ -2421,13 +2419,6 @@ public long getLocalCheckpoint() { return localCheckpointTracker.getCheckpoint(); } - /** - * Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value. - */ - protected final void markSeqNoAsSeen(long seqNo) { - localCheckpointTracker.advanceMaxSeqNo(seqNo); - } - /** * Checks if the given operation has been processed in this engine or not. * @return true if the given operation was processed; otherwise false. @@ -2541,6 +2532,7 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + tracker.advanceMaxSeqNo(operation.seqNo()); tracker.markSeqNoAsCompleted(operation.seqNo()); } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index a19d9ac4abb94..0f6f9c09219a1 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -45,10 +45,15 @@ public class LocalCheckpointTracker { */ volatile long checkpoint; + /** + * The max sequence number seen in this tracker + */ + private volatile long maxSeqNo; + /** * The next available sequence number. */ - private volatile long nextSeqNo; + private long nextSeqNo; /** * Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned, or @@ -68,8 +73,9 @@ public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) { throw new IllegalArgumentException( "max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]"); } - nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1; - checkpoint = localCheckpoint; + this.maxSeqNo = maxSeqNo; + this.nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1; + this.checkpoint = localCheckpoint; } /** @@ -78,16 +84,20 @@ public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) { * @return the next assigned sequence number */ public synchronized long generateSeqNo() { + assert nextSeqNo > maxSeqNo : nextSeqNo + " <= " + maxSeqNo; return nextSeqNo++; } /** - * Marks the provided sequence number as seen and updates the max_seq_no if needed. + * Marks the provided sequence number as seen and updates the max_seq_no and next_seq_no if needed. */ public synchronized void advanceMaxSeqNo(long seqNo) { if (seqNo >= nextSeqNo) { nextSeqNo = seqNo + 1; } + if (seqNo > maxSeqNo) { + maxSeqNo = seqNo; + } } /** @@ -96,9 +106,9 @@ public synchronized void advanceMaxSeqNo(long seqNo) { * @param seqNo the sequence number to mark as completed */ public synchronized void markSeqNoAsCompleted(final long seqNo) { - // make sure we track highest seen sequence number - if (seqNo >= nextSeqNo) { - nextSeqNo = seqNo + 1; + if (seqNo > maxSeqNo) { + assert false : "complete an unseen seq_no=" + seqNo + " > max_seq_no=" + maxSeqNo; + throw new IllegalArgumentException("complete an unseen seq_no=" + seqNo + " > max_seq_no=" + maxSeqNo); } if (seqNo <= checkpoint) { // this is possible during recovery where we might replay an operation that was also replicated @@ -122,12 +132,12 @@ public long getCheckpoint() { } /** - * The maximum sequence number issued so far. + * The maximum sequence number seen so far. * * @return the maximum sequence number */ public long getMaxSeqNo() { - return nextSeqNo - 1; + return maxSeqNo; } @@ -159,7 +169,7 @@ public synchronized void waitForOpsToComplete(final long seqNo) throws Interrupt */ public boolean contains(final long seqNo) { assert seqNo >= 0 : "invalid seq_no=" + seqNo; - if (seqNo >= nextSeqNo) { + if (seqNo > maxSeqNo) { return false; } if (seqNo <= checkpoint) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 6bca848a361fa..8f63853fa20b5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -698,6 +698,7 @@ void sendFiles(Store store, StoreFileMetaData[] files, Supplier translo final BytesArray content = new BytesArray(buffer, 0, bytesRead); final boolean lastChunk = position + content.length() == md.length(); final long requestSeqId = requestSeqIdTracker.generateSeqNo(); + requestSeqIdTracker.advanceMaxSeqNo(requestSeqId); cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - maxConcurrentFileChunks)); cancellableThreads.checkForCancel(); if (error.get() != null) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a48216ecaa73a..c709c78474ca3 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4064,13 +4064,12 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro } assertThat(initialEngine.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint.get())); - assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo((long) (docs - 1))); initialEngine.flush(true, true); - latchReference.get().countDown(); for (final Thread thread : threads) { thread.join(); } + assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo((long) (docs - 1))); } finally { IOUtils.close(initialEngine); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 44b3794ea6d42..4e0a24248a992 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -59,6 +59,7 @@ public void testSimplePrimary() { long seqNo1, seqNo2; assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); seqNo1 = tracker.generateSeqNo(); + tracker.advanceMaxSeqNo(seqNo1); assertThat(seqNo1, equalTo(0L)); tracker.markSeqNoAsCompleted(seqNo1); assertThat(tracker.getCheckpoint(), equalTo(0L)); @@ -66,6 +67,7 @@ public void testSimplePrimary() { assertThat(tracker.contains(atLeast(1)), equalTo(false)); seqNo1 = tracker.generateSeqNo(); seqNo2 = tracker.generateSeqNo(); + tracker.advanceMaxSeqNo(seqNo2); assertThat(seqNo1, equalTo(1L)); assertThat(seqNo2, equalTo(2L)); tracker.markSeqNoAsCompleted(seqNo2); @@ -81,13 +83,16 @@ public void testSimplePrimary() { public void testSimpleReplica() { assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); assertThat(tracker.contains(randomNonNegativeLong()), equalTo(false)); + tracker.advanceMaxSeqNo(0L); tracker.markSeqNoAsCompleted(0L); assertThat(tracker.getCheckpoint(), equalTo(0L)); assertThat(tracker.contains(0), equalTo(true)); + tracker.advanceMaxSeqNo(2L); tracker.markSeqNoAsCompleted(2L); assertThat(tracker.getCheckpoint(), equalTo(0L)); assertThat(tracker.contains(1L), equalTo(false)); assertThat(tracker.contains(2L), equalTo(true)); + tracker.advanceMaxSeqNo(1L); tracker.markSeqNoAsCompleted(1L); assertThat(tracker.getCheckpoint(), equalTo(2L)); assertThat(tracker.contains(between(0, 2)), equalTo(true)); @@ -100,6 +105,7 @@ public void testLazyInitialization() { * sequence numbers this could lead to excessive memory usage resulting in out of memory errors. */ long seqNo = randomNonNegativeLong(); + tracker.advanceMaxSeqNo(seqNo); tracker.markSeqNoAsCompleted(seqNo); assertThat(tracker.processedSeqNo.size(), equalTo(1)); assertThat(tracker.contains(seqNo), equalTo(true)); @@ -117,6 +123,7 @@ public void testSimpleOverFlow() { } Collections.shuffle(seqNoList, random()); for (Long seqNo : seqNoList) { + tracker.advanceMaxSeqNo(seqNo); tracker.markSeqNoAsCompleted(seqNo); } assertThat(tracker.checkpoint, equalTo(maxOps - 1L)); @@ -149,6 +156,7 @@ protected void doRun() throws Exception { barrier.await(); for (int i = 0; i < opsPerThread; i++) { long seqNo = tracker.generateSeqNo(); + tracker.advanceMaxSeqNo(seqNo); logger.info("[t{}] started [{}]", threadId, seqNo); if (seqNo != unFinishedSeq) { tracker.markSeqNoAsCompleted(seqNo); @@ -202,6 +210,7 @@ protected void doRun() throws Exception { Integer[] ops = seqNoPerThread[threadId]; for (int seqNo : ops) { if (seqNo != unFinishedSeq) { + tracker.advanceMaxSeqNo(seqNo); tracker.markSeqNoAsCompleted(seqNo); logger.info("[t{}] completed [{}]", threadId, seqNo); } @@ -216,6 +225,7 @@ protected void doRun() throws Exception { assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L)); assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L)); assertThat(tracker.contains(unFinishedSeq), equalTo(false)); + tracker.advanceMaxSeqNo(unFinishedSeq); tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); assertThat(tracker.contains(unFinishedSeq), equalTo(true)); @@ -251,10 +261,12 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte final List elements = IntStream.rangeClosed(0, seqNo).boxed().collect(Collectors.toList()); Randomness.shuffle(elements); for (int i = 0; i < elements.size() - 1; i++) { + tracker.advanceMaxSeqNo(elements.get(i)); tracker.markSeqNoAsCompleted(elements.get(i)); assertFalse(complete.get()); } + tracker.advanceMaxSeqNo(elements.get(elements.size() - 1)); tracker.markSeqNoAsCompleted(elements.get(elements.size() - 1)); // synchronize with the waiting thread to mark that it is complete barrier.await(); @@ -276,9 +288,23 @@ public void testContains() { for (int i = 0; i < numOps; i++) { long seqNo = randomLongBetween(0, 1000); seqNos.add(seqNo); + tracker.advanceMaxSeqNo(seqNo); tracker.markSeqNoAsCompleted(seqNo); } final long seqNo = randomNonNegativeLong(); assertThat(tracker.contains(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo))); } + + public void testAdvanceMaxSeqNo() { + final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100); + final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo); + final LocalCheckpointTracker tracker = new LocalCheckpointTracker(maxSeqNo, localCheckpoint); + assertThat(tracker.generateSeqNo(), equalTo(maxSeqNo + 1)); + tracker.advanceMaxSeqNo(randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo)); + assertThat(tracker.getMaxSeqNo(), equalTo(maxSeqNo)); + final long newMaxSeqNo = randomLongBetween(maxSeqNo, maxSeqNo + 10000); + tracker.advanceMaxSeqNo(newMaxSeqNo); + assertThat(tracker.getMaxSeqNo(), equalTo(newMaxSeqNo)); + assertThat(tracker.generateSeqNo(), equalTo(newMaxSeqNo + 1)); + } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 3eddeea2f2a8a..a37e8593b9520 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -985,6 +985,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep throw new AssertionError("unsupported operation type [" + type + "]"); } Translog.Location location = translog.add(op); + tracker.advanceMaxSeqNo(id); tracker.markSeqNoAsCompleted(id); Translog.Location existing = writtenOps.put(op, location); if (existing != null) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index acffacd4051d5..17b32776a5230 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -68,7 +68,6 @@ private void preFlight(final Operation operation) { @Override protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); - markSeqNoAsSeen(index.seqNo()); // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers. final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; @@ -104,7 +103,6 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind @Override protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { preFlight(delete); - markSeqNoAsSeen(delete.seqNo()); if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) { // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 0b445a3eb01ef..ce579cb92d478 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -490,6 +490,7 @@ protected void restoreFiles(List filesToRecover, Store store) throws I long offset = 0; while (offset < fileLength && error.get() == null) { final long requestSeqId = requestSeqIdTracker.generateSeqNo(); + requestSeqIdTracker.advanceMaxSeqNo(requestSeqId); try { requestSeqIdTracker.waitForOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks()); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 46c7c51586c53..ebb4e322d03bd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -129,6 +129,7 @@ protected void innerSendBulkShardOperationsRequest( Consumer handler, Consumer errorHandler) { for(Translog.Operation op : operations) { + tracker.advanceMaxSeqNo(op.seqNo()); tracker.markSeqNoAsCompleted(op.seqNo()); } receivedOperations.addAll(operations);