Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -136,6 +137,16 @@ public abstract class Engine implements Closeable {
*/
protected volatile long lastWriteNanos = System.nanoTime();

/*
* This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine.
* An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
* This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized.
* The value of this marker never goes backwards, and is updated/changed differently on primary and replica:
* 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete.
* 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes).
*/
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);

protected Engine(EngineConfig engineConfig) {
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");

Expand Down Expand Up @@ -1781,4 +1792,31 @@ public long getMaxSeenAutoIdTimestamp() {
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}

/**
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
*
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
*/
public final long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes.get();
}

/**
* A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the
* max_seq_no from Lucene index and translog before replaying the local translog in its local recovery.
*/
public abstract void initializeMaxSeqNoOfUpdatesOrDeletes();

/**
* A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method
* to advance this marker to at least the given sequence number.
*/
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo));
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is uninitialized";
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
Expand Down Expand Up @@ -918,6 +919,7 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw

protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may cause a bwc issue - if the primary is on an old node, it's a problem. Maybe only assert if the index created version is high enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make sure that we always initialize max_seq_no_of_updates of the engine of a primary shard (in IndexShard). Please let me know if it's correct.

final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
Expand Down Expand Up @@ -952,6 +954,10 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
);
}
}
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing);
}
return plan;
}

Expand Down Expand Up @@ -1242,6 +1248,7 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) {

protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about index version and bwc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commenting here as I can comment on the relevant lines - can we add an assertion in the index method and delete method that if we update/delete lucene than the seq# of the delete/update is <= msu?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you need the replica logic to be in place for that assertions to be added.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to replicate msu first. I will add these assertions later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make sure that we always initialize max_seq_no_of_updates of the engine of a primary shard (in IndexShard). Please let me know if it's correct.

// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete);
assert incrementVersionLookup();
Expand All @@ -1263,6 +1270,7 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE
currentlyDeleted,
generateSeqNoForOperation(delete),
delete.versionType().updateVersion(currentVersion, delete.version()));
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoOfDeletion);
}
return plan;
}
Expand Down Expand Up @@ -2548,4 +2556,12 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
}


@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO :
"max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]";
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,4 +379,9 @@ public DocsStats docStats() {
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {

}

@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,12 @@ public void updateShardState(final ShardRouting newRouting,
*/
engine.rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// TODO: Enable this assertion after we replicate max_seq_no_updates during replication
// assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) :
// indexSettings.getIndexVersionCreated();
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
}
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
@Override
Expand Down Expand Up @@ -1321,7 +1327,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
final Engine engine = getEngine();
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -1947,6 +1955,13 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
// If the old primary was on an old version, this primary (was replica before)
// does not have max_of_updates yet. Thus we need to bootstrap it manually.
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// TODO: Enable this assertion after we replicate max_seq_no_updates during replication
// assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : indexSettings.getIndexVersionCreated();
getEngine().initializeMaxSeqNoOfUpdatesOrDeletes();
}
}
}

Expand Down Expand Up @@ -2718,6 +2733,41 @@ void resetEngineToGlobalCheckpoint() throws IOException {
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
});
// TODO: do not use init method here but use advance with the max_seq_no received from the primary
newEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
}

/**
* Returns the maximum sequence number of either update or delete operations have been processed in this shard
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
* <p>
* The primary captures this value after executes a replication request, then transfers it to a replica before
* executing that replication request on a replica.
*/
public long getMaxSeqNoOfUpdatesOrDeletes() {
return getEngine().getMaxSeqNoOfUpdatesOrDeletes();
}

/**
* A replica calls this method to advance the max_seq_no_of_updates marker of its engine to at least the max_seq_no_of_updates
* value (piggybacked in a replication request) that it receives from its primary before executing that replication request.
* The receiving value is at least as high as the max_seq_no_of_updates on the primary was when any of the operations of that
* replication request were processed on it.
* <p>
* A replica shard also calls this method to bootstrap the max_seq_no_of_updates marker with the value that it received from
* the primary in peer-recovery, before it replays remote translog operations from the primary. The receiving value is at least
* as high as the max_seq_no_of_updates on the primary was when any of these operations were processed on it.
* <p>
* These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
*
* @see #acquireReplicaOperationPermit(long, long, ActionListener, String, Object)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -1825,6 +1826,19 @@ public String getTranslogUUID() {
return translogUUID;
}

/**
* Returns the max seq_no of translog operations found in this translog. Since this value is calculated based on the current
* existing readers, this value is not necessary to be the max seq_no of all operations have been stored in this translog.
*/
public long getMaxSeqNo() {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final OptionalLong maxSeqNo = Stream.concat(readers.stream(), Stream.of(current))
.mapToLong(reader -> reader.getCheckpoint().maxSeqNo).max();
assert maxSeqNo.isPresent() : "must have at least one translog generation";
return maxSeqNo.getAsLong();
}
}

TranslogWriter getCurrent() {
return current;
Expand Down
Loading