Skip to content

Commit 404b8dc

Browse files
committed
Introduce sequence number-based recovery
This commit introduces sequence-number-based recovery. When a replica has fallen out of sync, rather than performing a file-based recovery we first attempt to replay operations since the last local checkpoint on the replica. To do this, at the start of recovery the replica tells the primary what its local checkpoint is. The primary will then wait for all operations between that local checkpoint and the current maximum sequence number to complete; this is to ensure that there are no gaps in the operations that will be replayed from the primary to the replica. This is a best-effort attempt as we currently have no guarantees on the primary that these operations will be available; if we are not able to replay all operations in the desired range, we just fallback to file-based recovery. Later work will strengthen the guarantees.
1 parent b893494 commit 404b8dc

32 files changed

+1091
-461
lines changed

buildSrc/src/main/resources/checkstyle_suppressions.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,6 @@
424424
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoverySettings.java" checks="LineLength" />
425425
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]PeerRecoverySourceService.java" checks="LineLength" />
426426
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryState.java" checks="LineLength" />
427-
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]StartRecoveryRequest.java" checks="LineLength" />
428427
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStore.java" checks="LineLength" />
429428
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]TransportNodesListShardStoreMetaData.java" checks="LineLength" />
430429
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]ttl[/\\]IndicesTTLService.java" checks="LineLength" />

core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.elasticsearch.index.fielddata.IndexFieldDataService;
3737
import org.elasticsearch.index.mapper.FieldMapper;
3838
import org.elasticsearch.index.mapper.MapperService;
39-
import org.elasticsearch.index.seqno.LocalCheckpointService;
39+
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
4040
import org.elasticsearch.index.similarity.SimilarityService;
4141
import org.elasticsearch.index.store.FsDirectoryService;
4242
import org.elasticsearch.index.store.Store;
@@ -113,7 +113,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
113113
IndexSettings.ALLOW_UNMAPPED,
114114
IndexSettings.INDEX_CHECK_ON_STARTUP,
115115
IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL,
116-
LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE,
116+
LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE,
117117
IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD,
118118
IndexSettings.MAX_SLICES_PER_SCROLL,
119119
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,

core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
1920
package org.elasticsearch.common.util.concurrent;
2021

2122
import org.apache.lucene.store.AlreadyClosedException;

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ public Operation.TYPE getOperationType() {
361361

362362
void setTranslogLocation(Translog.Location translogLocation) {
363363
if (freeze.get() == null) {
364-
assert failure == null : "failure has to be null to set translog location";
364+
assert failure == null || translogLocation == null: "failure has to be null to set translog location";
365365
this.translogLocation = translogLocation;
366366
} else {
367367
throw new IllegalStateException("result is already frozen");
@@ -379,6 +379,7 @@ void setTook(long took) {
379379
void freeze() {
380380
freeze.set(true);
381381
}
382+
382383
}
383384

384385
public static class IndexResult extends Result {

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.elasticsearch.index.merge.MergeStats;
6464
import org.elasticsearch.index.merge.OnGoingMerge;
6565
import org.elasticsearch.index.seqno.SeqNoStats;
66+
import org.elasticsearch.index.seqno.SequenceNumbers;
6667
import org.elasticsearch.index.seqno.SequenceNumbersService;
6768
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
6869
import org.elasticsearch.index.shard.ShardId;
@@ -119,8 +120,6 @@ public class InternalEngine extends Engine {
119120
private final IndexThrottle throttle;
120121

121122
private final SequenceNumbersService seqNoService;
122-
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
123-
static final String MAX_SEQ_NO = "max_seq_no";
124123

125124
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
126125
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@@ -365,7 +364,7 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
365364
private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
366365
final TranslogConfig translogConfig,
367366
final IndexWriter indexWriter) throws IOException {
368-
long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath());
367+
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath());
369368
return loadSeqNoStatsFromLucene(globalCheckpoint, indexWriter);
370369
}
371370

@@ -378,20 +377,7 @@ private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
378377
* @return the sequence number stats
379378
*/
380379
private static SeqNoStats loadSeqNoStatsFromLucene(final long globalCheckpoint, final IndexWriter indexWriter) {
381-
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
382-
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
383-
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
384-
final String key = entry.getKey();
385-
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
386-
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
387-
localCheckpoint = Long.parseLong(entry.getValue());
388-
} else if (key.equals(MAX_SEQ_NO)) {
389-
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint;
390-
maxSeqNo = Long.parseLong(entry.getValue());
391-
}
392-
}
393-
394-
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
380+
return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, indexWriter.getLiveCommitData());
395381
}
396382

397383
private SearcherManager createSearcherManager() throws EngineException {
@@ -684,13 +670,20 @@ private IndexResult innerIndex(Index index) throws IOException {
684670
final IndexResult indexResult;
685671
if (checkVersionConflictResult.isPresent()) {
686672
indexResult = checkVersionConflictResult.get();
673+
// norelease: this is not correct as this does not force an fsync, and we need to handle failures including replication
674+
if (indexResult.hasFailure()) {
675+
location = null;
676+
} else {
677+
final Translog.NoOp operation = new Translog.NoOp(seqNo, index.primaryTerm(), "version conflict during recovery");
678+
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null;
679+
}
687680
} else {
688681
// no version conflict
689682
if (index.origin() == Operation.Origin.PRIMARY) {
690683
seqNo = seqNoService().generateSeqNo();
691684
}
692685

693-
/**
686+
/*
694687
* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
695688
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
696689
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
@@ -707,12 +700,11 @@ private IndexResult innerIndex(Index index) throws IOException {
707700
update(index.uid(), index.docs(), indexWriter);
708701
}
709702
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
710-
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
711-
? translog.add(new Translog.Index(index, indexResult))
712-
: null;
713703
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
714-
indexResult.setTranslogLocation(location);
704+
final Translog.Index operation = new Translog.Index(index, indexResult);
705+
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null;
715706
}
707+
indexResult.setTranslogLocation(location);
716708
indexResult.setTook(System.nanoTime() - index.startTime());
717709
indexResult.freeze();
718710
return indexResult;
@@ -816,21 +808,26 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
816808
final DeleteResult deleteResult;
817809
if (result.isPresent()) {
818810
deleteResult = result.get();
811+
// norelease: this is not correct as this does not force an fsync, and we need to handle failures including replication
812+
if (deleteResult.hasFailure()) {
813+
location = null;
814+
} else {
815+
final Translog.NoOp operation = new Translog.NoOp(seqNo, delete.primaryTerm(), "version conflict during recovery");
816+
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null;
817+
}
819818
} else {
820819
if (delete.origin() == Operation.Origin.PRIMARY) {
821820
seqNo = seqNoService().generateSeqNo();
822821
}
823-
824822
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
825823
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
826824
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
827-
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
828-
? translog.add(new Translog.Delete(delete, deleteResult))
829-
: null;
830825
versionMap.putUnderLock(delete.uid().bytes(),
831826
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
832-
deleteResult.setTranslogLocation(location);
827+
final Translog.Delete operation = new Translog.Delete(delete, deleteResult);
828+
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(operation) : null;
833829
}
830+
deleteResult.setTranslogLocation(location);
834831
deleteResult.setTook(System.nanoTime() - delete.startTime());
835832
deleteResult.freeze();
836833
return deleteResult;
@@ -1552,11 +1549,11 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
15521549
final Map<String, String> commitData = new HashMap<>(6);
15531550
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
15541551
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
1555-
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
1552+
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
15561553
if (syncId != null) {
15571554
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
15581555
}
1559-
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
1556+
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
15601557
if (logger.isTraceEnabled()) {
15611558
logger.trace("committing writer with commit data [{}]", commitData);
15621559
}

core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java renamed to core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,25 @@
2222
import com.carrotsearch.hppc.ObjectLongHashMap;
2323
import com.carrotsearch.hppc.ObjectLongMap;
2424
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
25+
import org.apache.logging.log4j.Logger;
2526
import org.elasticsearch.index.IndexSettings;
26-
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
27-
import org.elasticsearch.index.shard.ShardId;
2827

2928
import java.util.HashSet;
29+
import java.util.Locale;
3030
import java.util.Set;
3131

3232
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
3333

3434
/**
35-
* A shard component that is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which
36-
* all lower (or equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the
37-
* master starts them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery.
38-
* These shards have received all old operations via the recovery mechanism and are kept up to date by the various replications actions.
39-
* The set of shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
35+
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
36+
* equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts
37+
* them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards
38+
* have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of
39+
* shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
4040
* <p>
4141
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
4242
*/
43-
public class GlobalCheckpointService extends AbstractIndexShardComponent {
43+
public class GlobalCheckpointTracker {
4444

4545
/*
4646
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
@@ -62,20 +62,22 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
6262
*/
6363
private long globalCheckpoint;
6464

65+
private final Logger logger;
66+
6567
/**
66-
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint for this
67-
* shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
68+
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
69+
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
6870
*
69-
* @param shardId the shard this service is tracking local checkpoints for
7071
* @param indexSettings the index settings
7172
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
73+
* @param logger a component logger
7274
*/
73-
GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
74-
super(shardId, indexSettings);
75+
GlobalCheckpointTracker(final IndexSettings indexSettings, final long globalCheckpoint, final Logger logger) {
7576
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
7677
inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
7778
assignedAllocationIds = new HashSet<>(1 + indexSettings.getNumberOfReplicas());
7879
this.globalCheckpoint = globalCheckpoint;
80+
this.logger = logger;
7981
}
8082

8183
/**
@@ -127,8 +129,9 @@ synchronized boolean updateCheckpointOnPrimary() {
127129
minCheckpoint = Math.min(cp.value, minCheckpoint);
128130
}
129131
if (minCheckpoint < globalCheckpoint) {
130-
throw new IllegalStateException(shardId + " new global checkpoint [" + minCheckpoint
131-
+ "] is lower than previous one [" + globalCheckpoint + "]");
132+
final String message =
133+
String.format(Locale.ROOT, "new global checkpoint [%d] is lower than previous one [%d]", minCheckpoint, globalCheckpoint);
134+
throw new IllegalStateException(message);
132135
}
133136
if (globalCheckpoint != minCheckpoint) {
134137
logger.trace("global checkpoint updated to [{}]", minCheckpoint);

0 commit comments

Comments
 (0)