Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,9 @@ public IndexResult index(Index index) throws IOException {
* or calls updateDocument.
*/
final IndexingStrategy plan = indexingStrategyForOperation(index);

if (plan.seqNoForIndexing != SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpointTracker.advanceMaxSeqNo(plan.seqNoForIndexing);
}
final IndexResult indexResult;
if (plan.earlyResultOnPreFlightError.isPresent()) {
indexResult = plan.earlyResultOnPreFlightError.get();
Expand Down Expand Up @@ -943,7 +945,6 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
}
}
}
markSeqNoAsSeen(index.seqNo());
return plan;
}

Expand Down Expand Up @@ -1221,7 +1222,9 @@ public DeleteResult delete(Delete delete) throws IOException {
ensureOpen();
lastWriteNanos = delete.startTime();
final DeletionStrategy plan = deletionStrategyForOperation(delete);

if (plan.seqNoOfDeletion != SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpointTracker.advanceMaxSeqNo(plan.seqNoOfDeletion);
}
if (plan.earlyResultOnPreflightError.isPresent()) {
deleteResult = plan.earlyResultOnPreflightError.get();
} else if (plan.deleteFromLucene || plan.addStaleOpToLucene) {
Expand Down Expand Up @@ -1297,7 +1300,6 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
delete.seqNo(), delete.version());
}
}
markSeqNoAsSeen(delete.seqNo());
return plan;
}

Expand Down Expand Up @@ -1453,7 +1455,6 @@ public NoOpResult noOp(final NoOp noOp) throws IOException {
final NoOpResult noOpResult;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
markSeqNoAsSeen(noOp.seqNo());
noOpResult = innerNoOp(noOp);
} catch (final Exception e) {
try {
Expand All @@ -1469,8 +1470,8 @@ public NoOpResult noOp(final NoOp noOp) throws IOException {
private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try (Releasable ignored = noOpKeyedLock.acquire(seqNo)) {
try (Releasable ignored = noOpKeyedLock.acquire(noOp.seqNo())) {
localCheckpointTracker.advanceMaxSeqNo(noOp.seqNo());
final NoOpResult noOpResult;
final Optional<Exception> preFlightError = preFlightCheckForNoOp(noOp);
if (preFlightError.isPresent()) {
Expand Down Expand Up @@ -1508,13 +1509,10 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
noOpResult.setTranslogLocation(location);
}
}
localCheckpointTracker.markSeqNoAsCompleted(noOp.seqNo());
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
return noOpResult;
} finally {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}
}
}

Expand Down Expand Up @@ -2421,13 +2419,6 @@ public long getLocalCheckpoint() {
return localCheckpointTracker.getCheckpoint();
}

/**
* Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value.
*/
protected final void markSeqNoAsSeen(long seqNo) {
localCheckpointTracker.advanceMaxSeqNo(seqNo);
}

/**
* Checks if the given operation has been processed in this engine or not.
* @return true if the given operation was processed; otherwise false.
Expand Down Expand Up @@ -2541,6 +2532,7 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.advanceMaxSeqNo(operation.seqNo());
tracker.markSeqNoAsCompleted(operation.seqNo());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ public class LocalCheckpointTracker {
*/
volatile long checkpoint;

/**
* The max sequence number seen in this tracker
*/
private volatile long maxSeqNo;

/**
* The next available sequence number.
*/
private volatile long nextSeqNo;
private long nextSeqNo;

/**
* Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned, or
Expand All @@ -68,8 +73,9 @@ public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) {
throw new IllegalArgumentException(
"max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
}
nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
checkpoint = localCheckpoint;
this.maxSeqNo = maxSeqNo;
this.nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
this.checkpoint = localCheckpoint;
}

/**
Expand All @@ -78,16 +84,20 @@ public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) {
* @return the next assigned sequence number
*/
public synchronized long generateSeqNo() {
assert nextSeqNo > maxSeqNo : nextSeqNo + " <= " + maxSeqNo;
return nextSeqNo++;
}

/**
* Marks the provided sequence number as seen and updates the max_seq_no if needed.
* Marks the provided sequence number as seen and updates the max_seq_no and next_seq_no if needed.
*/
public synchronized void advanceMaxSeqNo(long seqNo) {
if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1;
}
if (seqNo > maxSeqNo) {
maxSeqNo = seqNo;
}
}

/**
Expand All @@ -96,9 +106,9 @@ public synchronized void advanceMaxSeqNo(long seqNo) {
* @param seqNo the sequence number to mark as completed
*/
public synchronized void markSeqNoAsCompleted(final long seqNo) {
// make sure we track highest seen sequence number
if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1;
if (seqNo > maxSeqNo) {
assert false : "complete an unseen seq_no=" + seqNo + " > max_seq_no=" + maxSeqNo;
throw new IllegalArgumentException("complete an unseen seq_no=" + seqNo + " > max_seq_no=" + maxSeqNo);
}
if (seqNo <= checkpoint) {
// this is possible during recovery where we might replay an operation that was also replicated
Expand All @@ -122,12 +132,12 @@ public long getCheckpoint() {
}

/**
* The maximum sequence number issued so far.
* The maximum sequence number seen so far.
*
* @return the maximum sequence number
*/
public long getMaxSeqNo() {
return nextSeqNo - 1;
return maxSeqNo;
}


Expand Down Expand Up @@ -159,7 +169,7 @@ public synchronized void waitForOpsToComplete(final long seqNo) throws Interrupt
*/
public boolean contains(final long seqNo) {
assert seqNo >= 0 : "invalid seq_no=" + seqNo;
if (seqNo >= nextSeqNo) {
if (seqNo > maxSeqNo) {
return false;
}
if (seqNo <= checkpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translo
final BytesArray content = new BytesArray(buffer, 0, bytesRead);
final boolean lastChunk = position + content.length() == md.length();
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
requestSeqIdTracker.advanceMaxSeqNo(requestSeqId);
cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - maxConcurrentFileChunks));
cancellableThreads.checkForCancel();
if (error.get() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4064,13 +4064,12 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro
}

assertThat(initialEngine.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint.get()));
assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo((long) (docs - 1)));
initialEngine.flush(true, true);

latchReference.get().countDown();
for (final Thread thread : threads) {
thread.join();
}
assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo((long) (docs - 1)));
} finally {
IOUtils.close(initialEngine);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ public void testSimplePrimary() {
long seqNo1, seqNo2;
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
tracker.advanceMaxSeqNo(seqNo1);
assertThat(seqNo1, equalTo(0L));
tracker.markSeqNoAsCompleted(seqNo1);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(0L), equalTo(true));
assertThat(tracker.contains(atLeast(1)), equalTo(false));
seqNo1 = tracker.generateSeqNo();
seqNo2 = tracker.generateSeqNo();
tracker.advanceMaxSeqNo(seqNo2);
assertThat(seqNo1, equalTo(1L));
assertThat(seqNo2, equalTo(2L));
tracker.markSeqNoAsCompleted(seqNo2);
Expand All @@ -81,13 +83,16 @@ public void testSimplePrimary() {
public void testSimpleReplica() {
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(tracker.contains(randomNonNegativeLong()), equalTo(false));
tracker.advanceMaxSeqNo(0L);
tracker.markSeqNoAsCompleted(0L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(0), equalTo(true));
tracker.advanceMaxSeqNo(2L);
tracker.markSeqNoAsCompleted(2L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(1L), equalTo(false));
assertThat(tracker.contains(2L), equalTo(true));
tracker.advanceMaxSeqNo(1L);
tracker.markSeqNoAsCompleted(1L);
assertThat(tracker.getCheckpoint(), equalTo(2L));
assertThat(tracker.contains(between(0, 2)), equalTo(true));
Expand All @@ -100,6 +105,7 @@ public void testLazyInitialization() {
* sequence numbers this could lead to excessive memory usage resulting in out of memory errors.
*/
long seqNo = randomNonNegativeLong();
tracker.advanceMaxSeqNo(seqNo);
tracker.markSeqNoAsCompleted(seqNo);
assertThat(tracker.processedSeqNo.size(), equalTo(1));
assertThat(tracker.contains(seqNo), equalTo(true));
Expand All @@ -117,6 +123,7 @@ public void testSimpleOverFlow() {
}
Collections.shuffle(seqNoList, random());
for (Long seqNo : seqNoList) {
tracker.advanceMaxSeqNo(seqNo);
tracker.markSeqNoAsCompleted(seqNo);
}
assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
Expand Down Expand Up @@ -149,6 +156,7 @@ protected void doRun() throws Exception {
barrier.await();
for (int i = 0; i < opsPerThread; i++) {
long seqNo = tracker.generateSeqNo();
tracker.advanceMaxSeqNo(seqNo);
logger.info("[t{}] started [{}]", threadId, seqNo);
if (seqNo != unFinishedSeq) {
tracker.markSeqNoAsCompleted(seqNo);
Expand Down Expand Up @@ -202,6 +210,7 @@ protected void doRun() throws Exception {
Integer[] ops = seqNoPerThread[threadId];
for (int seqNo : ops) {
if (seqNo != unFinishedSeq) {
tracker.advanceMaxSeqNo(seqNo);
tracker.markSeqNoAsCompleted(seqNo);
logger.info("[t{}] completed [{}]", threadId, seqNo);
}
Expand All @@ -216,6 +225,7 @@ protected void doRun() throws Exception {
assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L));
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
assertThat(tracker.contains(unFinishedSeq), equalTo(false));
tracker.advanceMaxSeqNo(unFinishedSeq);
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.contains(unFinishedSeq), equalTo(true));
Expand Down Expand Up @@ -251,10 +261,12 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte
final List<Integer> elements = IntStream.rangeClosed(0, seqNo).boxed().collect(Collectors.toList());
Randomness.shuffle(elements);
for (int i = 0; i < elements.size() - 1; i++) {
tracker.advanceMaxSeqNo(elements.get(i));
tracker.markSeqNoAsCompleted(elements.get(i));
assertFalse(complete.get());
}

tracker.advanceMaxSeqNo(elements.get(elements.size() - 1));
tracker.markSeqNoAsCompleted(elements.get(elements.size() - 1));
// synchronize with the waiting thread to mark that it is complete
barrier.await();
Expand All @@ -276,9 +288,23 @@ public void testContains() {
for (int i = 0; i < numOps; i++) {
long seqNo = randomLongBetween(0, 1000);
seqNos.add(seqNo);
tracker.advanceMaxSeqNo(seqNo);
tracker.markSeqNoAsCompleted(seqNo);
}
final long seqNo = randomNonNegativeLong();
assertThat(tracker.contains(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo)));
}

public void testAdvanceMaxSeqNo() {
final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100);
final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(maxSeqNo, localCheckpoint);
assertThat(tracker.generateSeqNo(), equalTo(maxSeqNo + 1));
tracker.advanceMaxSeqNo(randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo));
assertThat(tracker.getMaxSeqNo(), equalTo(maxSeqNo));
final long newMaxSeqNo = randomLongBetween(maxSeqNo, maxSeqNo + 10000);
tracker.advanceMaxSeqNo(newMaxSeqNo);
assertThat(tracker.getMaxSeqNo(), equalTo(newMaxSeqNo));
assertThat(tracker.generateSeqNo(), equalTo(newMaxSeqNo + 1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep
throw new AssertionError("unsupported operation type [" + type + "]");
}
Translog.Location location = translog.add(op);
tracker.advanceMaxSeqNo(id);
tracker.markSeqNoAsCompleted(id);
Translog.Location existing = writtenOps.put(op, location);
if (existing != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ private void preFlight(final Operation operation) {
@Override
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
preFlight(index);
markSeqNoAsSeen(index.seqNo());
// NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
Expand Down Expand Up @@ -104,7 +103,6 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind
@Override
protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
preFlight(delete);
markSeqNoAsSeen(delete.seqNo());
if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) {
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
long offset = 0;
while (offset < fileLength && error.get() == null) {
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
requestSeqIdTracker.advanceMaxSeqNo(requestSeqId);
try {
requestSeqIdTracker.waitForOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ protected void innerSendBulkShardOperationsRequest(
Consumer<BulkShardOperationsResponse> handler,
Consumer<Exception> errorHandler) {
for(Translog.Operation op : operations) {
tracker.advanceMaxSeqNo(op.seqNo());
tracker.markSeqNoAsCompleted(op.seqNo());
}
receivedOperations.addAll(operations);
Expand Down