Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -400,14 +399,14 @@ void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable
public interface ReplicaResponse {

/**
* The local checkpoint for the shard. See {@link SequenceNumbersService#getLocalCheckpoint()}.
* The local checkpoint for the shard.
*
* @return the local checkpoint
**/
long localCheckpoint();

/**
* The global checkpoint for the shard. See {@link SequenceNumbersService#getGlobalCheckpoint()}.
* The global checkpoint for the shard.
*
* @return the global checkpoint
**/
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -567,7 +567,7 @@ public CommitStats commitStats() {
*
* @return the sequence number service
*/
public abstract SequenceNumbersService seqNoService();
public abstract LocalCheckpointTracker getLocalCheckpointTracker();

/**
* Read the last segments info from the commit pointed to by the searcher manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.io.IOException;
import java.util.List;
import java.util.function.LongSupplier;

/*
* Holds all the configuration that is used to create an {@link Engine}.
Expand Down Expand Up @@ -78,6 +79,7 @@ public final class EngineConfig {
private final TranslogRecoveryRunner translogRecoveryRunner;
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -124,7 +126,8 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService) {
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
Expand Down Expand Up @@ -155,6 +158,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
this.indexSort = indexSort;
this.translogRecoveryRunner = translogRecoveryRunner;
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
}

/**
Expand Down Expand Up @@ -227,6 +231,13 @@ public Store getStore() {
return store;
}

/**
* Returns the global checkpoint tracker
*/
public LongSupplier getGlobalCheckpointSupplier() {
return globalCheckpointSupplier;
}

/**
* Returns the {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter}
*/
Expand Down
102 changes: 46 additions & 56 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -67,9 +68,8 @@
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -123,7 +123,7 @@ public class InternalEngine extends Engine {

private final IndexThrottle throttle;

private final SequenceNumbersService seqNoService;
private final LocalCheckpointTracker localCheckpointTracker;

private final String uidField;

Expand All @@ -150,12 +150,12 @@ public class InternalEngine extends Engine {
private final String historyUUID;

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, InternalEngine::sequenceNumberService);
this(engineConfig, LocalCheckpointTracker::new);
}

InternalEngine(
final EngineConfig engineConfig,
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> seqNoServiceSupplier) {
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
openMode = engineConfig.getOpenMode();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
Expand All @@ -179,14 +179,12 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
final SeqNoStats seqNoStats = loadSeqNoStats(openMode);
logger.trace("recovered [{}]", seqNoStats);
this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
translog = openTranslog(engineConfig, translogDeletionPolicy, seqNoService::getGlobalCheckpoint);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
Expand Down Expand Up @@ -237,6 +235,27 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
break;
case OPEN_INDEX_AND_TRANSLOG:
case OPEN_INDEX_CREATE_TRANSLOG:
final Tuple<Long, Long> seqNoStats = store.loadSeqNoInfo();
maxSeqNo = seqNoStats.v1();
localCheckpoint = seqNoStats.v2();
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
break;
default: throw new IllegalArgumentException("unknown type: " + openMode);
}
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
}

/**
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
* The main purpose for this is that if we have external refreshes happening we don't issue extra
Expand Down Expand Up @@ -310,12 +329,12 @@ protected int getRefCount(IndexSearcher reference) {
public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() > localCheckpoint) {
seqNoService.markSeqNoAsCompleted(operation.seqNo());
localCheckpointTracker.markSeqNoAsCompleted(operation.seqNo());
}
}
}
Expand All @@ -326,17 +345,17 @@ public void restoreLocalCheckpointFromTranslog() throws IOException {
public int fillSeqNoGaps(long primaryTerm) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long maxSeqNo = seqNoService.getMaxSeqNo();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
final long maxSeqNo = localCheckpointTracker.getMaxSeqNo();
int numNoOpsAdded = 0;
for (
long seqNo = localCheckpoint + 1;
seqNo <= maxSeqNo;
seqNo = seqNoService.getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
seqNo = localCheckpointTracker.getCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps"));
numNoOpsAdded++;
assert seqNo <= seqNoService.getLocalCheckpoint()
: "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService.getLocalCheckpoint() + "]";
assert seqNo <= localCheckpointTracker.getCheckpoint()
: "local checkpoint did not advance; was [" + seqNo + "], now [" + localCheckpointTracker.getCheckpoint() + "]";

}
return numNoOpsAdded;
Expand All @@ -354,35 +373,6 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
}

static SequenceNumbersService sequenceNumberService(
final EngineConfig engineConfig,
final SeqNoStats seqNoStats) {
return new SequenceNumbersService(
engineConfig.getShardId(),
engineConfig.getAllocationId(),
engineConfig.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
}

private SeqNoStats loadSeqNoStats(EngineConfig.OpenMode openMode) throws IOException {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
return store.loadSeqNoStats(globalCheckpoint);
case OPEN_INDEX_CREATE_TRANSLOG:
return store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
case CREATE_INDEX_AND_TRANSLOG:
return new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
default:
throw new IllegalArgumentException(openMode.toString());
}
}

@Override
public InternalEngine recoverFromTranslog() throws IOException {
flushLock.lock();
Expand Down Expand Up @@ -732,7 +722,7 @@ private long generateSeqNoForOperation(final Operation operation) {
* @return the sequence number
*/
protected long doGenerateSeqNoForOperation(final Operation operation) {
return seqNoService.generateSeqNo();
return localCheckpointTracker.generateSeqNo();
}

@Override
Expand Down Expand Up @@ -803,7 +793,7 @@ public IndexResult index(Index index) throws IOException {
indexResult.setTranslogLocation(location);
}
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
Expand Down Expand Up @@ -834,7 +824,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (index.seqNo() <= seqNoService.getLocalCheckpoint()){
if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
Expand Down Expand Up @@ -1100,7 +1090,7 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult.setTranslogLocation(location);
}
if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo());
localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo());
}
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
Expand All @@ -1126,7 +1116,7 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (delete.seqNo() <= seqNoService.getLocalCheckpoint()) {
if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) {
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
Expand Down Expand Up @@ -1273,7 +1263,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
return noOpResult;
} finally {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(seqNo);
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}
}
}
Expand Down Expand Up @@ -2017,7 +2007,7 @@ private void maybeDie(final String maybeMessage, final Throwable maybeFatal) {
protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
Expand All @@ -2040,7 +2030,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService.getMaxSeqNo()));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
logger.trace("committing writer with commit data [{}]", commitData);
Expand Down Expand Up @@ -2104,8 +2094,8 @@ public MergeStats getMergeStats() {
return mergeScheduler.stats();
}

public final SequenceNumbersService seqNoService() {
return seqNoService;
public final LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
Expand All @@ -55,7 +56,7 @@
* <p>
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
*/
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
public class GlobalCheckpointTracker extends AbstractIndexShardComponent implements LongSupplier {

/**
* The allocation ID for the shard to which this tracker is a component of.
Expand Down Expand Up @@ -214,7 +215,7 @@ public int hashCode() {
*
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
*/
synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
public synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
assert primaryMode;
assert handoffInProgress == false;
final ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); // upper bound on the size
Expand Down Expand Up @@ -329,7 +330,7 @@ private static long inSyncCheckpointStates(
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointTracker(
public GlobalCheckpointTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -374,6 +375,11 @@ public synchronized long getGlobalCheckpoint() {
return cps.globalCheckpoint;
}

@Override
public long getAsLong() {
return getGlobalCheckpoint();
}

/**
* Updates the global checkpoint on a replica shard after it has been updated by the primary.
*
Expand Down
Loading