Skip to content

Commit 275ea68

Browse files
authored
Persist sequence number checkpoints
This commit adds persistence for local and global sequence number checkpoints. We also recover the max sequence number in a shard, although there will be loss here today from delete operations. This will be addressed in a follow-up. Relates #18949
1 parent 601c056 commit 275ea68

File tree

10 files changed

+350
-110
lines changed

10 files changed

+350
-110
lines changed

core/src/main/java/org/elasticsearch/action/fieldstats/FieldStats.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.net.InetAddress;
3737

3838
public abstract class FieldStats<T> implements Writeable, ToXContent {
39+
3940
private final byte type;
4041
private long maxDoc;
4142
private long docCount;
@@ -628,4 +629,5 @@ private final static class Fields {
628629
final static String MAX_VALUE = new String("max_value");
629630
final static String MAX_VALUE_AS_STRING = new String("max_value_as_string");
630631
}
632+
631633
}

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.lucene.util.IOUtils;
4242
import org.apache.lucene.util.InfoStream;
4343
import org.elasticsearch.ExceptionsHelper;
44+
import org.elasticsearch.action.fieldstats.FieldStats;
4445
import org.elasticsearch.common.Nullable;
4546
import org.elasticsearch.common.lease.Releasable;
4647
import org.elasticsearch.common.logging.ESLogger;
@@ -54,8 +55,10 @@
5455
import org.elasticsearch.common.util.concurrent.ReleasableLock;
5556
import org.elasticsearch.index.IndexSettings;
5657
import org.elasticsearch.index.mapper.Uid;
58+
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
5759
import org.elasticsearch.index.merge.MergeStats;
5860
import org.elasticsearch.index.merge.OnGoingMerge;
61+
import org.elasticsearch.index.seqno.SeqNoStats;
5962
import org.elasticsearch.index.seqno.SequenceNumbersService;
6063
import org.elasticsearch.index.shard.DocsStats;
6164
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
@@ -78,10 +81,13 @@
7881
import java.util.concurrent.locks.ReentrantLock;
7982
import java.util.function.Function;
8083

84+
import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED;
85+
8186
/**
8287
*
8388
*/
8489
public class InternalEngine extends Engine {
90+
8591
/**
8692
* When we last pruned expired tombstones from versionMap.deletes:
8793
*/
@@ -111,6 +117,8 @@ public class InternalEngine extends Engine {
111117
private final IndexThrottle throttle;
112118

113119
private final SequenceNumbersService seqNoService;
120+
final static String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
121+
final static String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
114122

115123
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
116124
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@@ -131,12 +139,27 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
131139
boolean success = false;
132140
try {
133141
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
134-
seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings());
142+
135143
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
136144
throttle = new IndexThrottle();
137145
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
138146
try {
139147
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
148+
final SeqNoStats seqNoStats = loadSeqNoStatsFromCommit(writer);
149+
if (logger.isTraceEnabled()) {
150+
logger.trace(
151+
"recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]",
152+
seqNoStats.getMaxSeqNo(),
153+
seqNoStats.getLocalCheckpoint(),
154+
seqNoStats.getGlobalCheckpoint());
155+
}
156+
seqNoService =
157+
new SequenceNumbersService(
158+
shardId,
159+
engineConfig.getIndexSettings(),
160+
seqNoStats.getMaxSeqNo(),
161+
seqNoStats.getLocalCheckpoint(),
162+
seqNoStats.getGlobalCheckpoint());
140163
indexWriter = writer;
141164
translog = openTranslog(engineConfig, writer);
142165
assert translog.getGeneration() != null;
@@ -287,6 +310,36 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
287310
return null;
288311
}
289312

313+
private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
314+
final long maxSeqNo;
315+
try (IndexReader reader = DirectoryReader.open(writer)) {
316+
final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
317+
if (stats != null) {
318+
maxSeqNo = (long) stats.getMaxValue();
319+
} else {
320+
maxSeqNo = NO_OPS_PERFORMED;
321+
}
322+
}
323+
324+
final Map<String, String> commitUserData = writer.getCommitData();
325+
326+
final long localCheckpoint;
327+
if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) {
328+
localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY));
329+
} else {
330+
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
331+
}
332+
333+
final long globalCheckpoint;
334+
if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) {
335+
globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY));
336+
} else {
337+
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
338+
}
339+
340+
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
341+
}
342+
290343
private SearcherManager createSearcherManager() throws EngineException {
291344
boolean success = false;
292345
SearcherManager searcherManager = null;
@@ -1132,13 +1185,22 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
11321185
ensureCanFlush();
11331186
try {
11341187
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
1135-
logger.trace("committing writer with translog id [{}] and sync id [{}] ", translogGeneration.translogFileGeneration, syncId);
1136-
Map<String, String> commitData = new HashMap<>(2);
1188+
final Map<String, String> commitData = new HashMap<>(5);
1189+
11371190
commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
11381191
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
1192+
1193+
commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService().getLocalCheckpoint()));
1194+
commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService().getGlobalCheckpoint()));
1195+
11391196
if (syncId != null) {
11401197
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
11411198
}
1199+
1200+
if (logger.isTraceEnabled()) {
1201+
logger.trace("committing writer with commit data [{}]", commitData);
1202+
}
1203+
11421204
indexWriter.setCommitData(commitData);
11431205
writer.commit();
11441206
} catch (Throwable ex) {

core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@
2222
import org.apache.lucene.document.Field;
2323
import org.apache.lucene.document.NumericDocValuesField;
2424
import org.apache.lucene.index.DocValuesType;
25+
import org.apache.lucene.index.IndexReader;
26+
import org.apache.lucene.index.LeafReader;
27+
import org.apache.lucene.index.LeafReaderContext;
28+
import org.apache.lucene.index.NumericDocValues;
2529
import org.apache.lucene.search.Query;
30+
import org.apache.lucene.util.Bits;
31+
import org.elasticsearch.action.fieldstats.FieldStats;
2632
import org.elasticsearch.common.Nullable;
2733
import org.elasticsearch.common.settings.Settings;
2834
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -108,6 +114,36 @@ public Query termQuery(Object value, @Nullable QueryShardContext context) {
108114
throw new QueryShardException(context, "SeqNoField field [" + name() + "] is not searchable");
109115
}
110116

117+
@Override
118+
public FieldStats stats(IndexReader reader) throws IOException {
119+
// nocommit remove implementation when late-binding commits
120+
// are possible
121+
final List<LeafReaderContext> leaves = reader.leaves();
122+
if (leaves.isEmpty()) {
123+
return null;
124+
}
125+
126+
long currentMin = Long.MAX_VALUE;
127+
long currentMax = Long.MIN_VALUE;
128+
boolean found = false;
129+
for (int i = 0; i < leaves.size(); i++) {
130+
final LeafReader leaf = leaves.get(i).reader();
131+
final NumericDocValues values = leaf.getNumericDocValues(name());
132+
if (values == null) continue;
133+
final Bits bits = leaf.getLiveDocs();
134+
for (int docID = 0; docID < leaf.maxDoc(); docID++) {
135+
if (bits == null || bits.get(docID)) {
136+
found = true;
137+
final long value = values.get(docID);
138+
currentMin = Math.min(currentMin, value);
139+
currentMax = Math.max(currentMax, value);
140+
}
141+
}
142+
}
143+
144+
return found ? new FieldStats.Long(reader.maxDoc(), 0, -1, -1, false, true, currentMin, currentMax) : null;
145+
}
146+
111147
}
112148

113149
public SeqNoFieldMapper(Settings indexSettings) {
@@ -129,7 +165,7 @@ protected void parseCreateField(ParseContext context, List<Field> fields) throws
129165

130166
@Override
131167
public Mapper parse(ParseContext context) throws IOException {
132-
// _seqno added in preparse
168+
// _seq_no added in pre-parse
133169
return null;
134170
}
135171

@@ -157,4 +193,5 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
157193
protected void doMerge(Mapper mergeWith, boolean updateAllTypes) {
158194
// nothing to do
159195
}
196+
160197
}

core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,28 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
6767
*/
6868
final private ObjectLongMap<String> trackingLocalCheckpoint;
6969

70-
private long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
70+
private long globalCheckpoint;
7171

72-
public GlobalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
72+
/**
73+
* Initialize the global checkpoint service. The {@code globalCheckpoint}
74+
* should be set to the last known global checkpoint for this shard, or
75+
* {@link SequenceNumbersService#NO_OPS_PERFORMED}.
76+
*
77+
* @param shardId the shard this service is providing tracking
78+
* local checkpoints for
79+
* @param indexSettings the index settings
80+
* @param globalCheckpoint the last known global checkpoint for this shard,
81+
* or
82+
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
83+
*/
84+
GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
7385
super(shardId, indexSettings);
7486
activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
7587
inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
7688
trackingLocalCheckpoint = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
89+
this.globalCheckpoint = globalCheckpoint;
7790
}
7891

79-
8092
/**
8193
* notifies the service of a local checkpoint. if the checkpoint is lower than the currently known one,
8294
* this is a noop. Last, if the allocation id is not yet known, it is ignored. This to prevent late
@@ -124,7 +136,7 @@ private boolean updateLocalCheckpointInMap(String allocationId, long localCheckp
124136
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
125137
* of one of the active allocations is not known.
126138
*/
127-
synchronized public boolean updateCheckpointOnPrimary() {
139+
synchronized boolean updateCheckpointOnPrimary() {
128140
long minCheckpoint = Long.MAX_VALUE;
129141
if (activeLocalCheckpoints.isEmpty() && inSyncLocalCheckpoints.isEmpty()) {
130142
return false;
@@ -164,7 +176,7 @@ synchronized public long getCheckpoint() {
164176
/**
165177
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
166178
*/
167-
synchronized public void updateCheckpointOnReplica(long globalCheckpoint) {
179+
synchronized void updateCheckpointOnReplica(long globalCheckpoint) {
168180
if (this.globalCheckpoint <= globalCheckpoint) {
169181
this.globalCheckpoint = globalCheckpoint;
170182
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);
@@ -241,4 +253,5 @@ synchronized long getLocalCheckpointForAllocation(String allocationId) {
241253
}
242254
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
243255
}
256+
244257
}

core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.index.IndexSettings;
2424
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
2525
import org.elasticsearch.index.shard.ShardId;
26+
import org.elasticsearch.index.shard.SnapshotStatus;
2627

2728
import java.util.LinkedList;
2829

@@ -39,39 +40,64 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
3940
public static Setting<Integer> SETTINGS_BIT_ARRAYS_SIZE = Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024,
4041
4, Setting.Property.IndexScope);
4142

42-
4343
/**
4444
* an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstProcessedSeqNo}
4545
* which marks the seqNo the fist bit in the first array corresponds to.
4646
*/
4747
final LinkedList<FixedBitSet> processedSeqNo;
48-
final int bitArraysSize;
49-
long firstProcessedSeqNo = 0;
48+
private final int bitArraysSize;
49+
long firstProcessedSeqNo;
5050

5151
/** the current local checkpoint, i.e., all seqNo lower (&lt;=) than this number have been completed */
52-
volatile long checkpoint = SequenceNumbersService.NO_OPS_PERFORMED;
52+
volatile long checkpoint;
5353

5454
/** the next available seqNo - used for seqNo generation */
55-
volatile long nextSeqNo = 0;
56-
55+
private volatile long nextSeqNo;
5756

58-
public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
57+
/**
58+
* Initialize the local checkpoint service. The {@code maxSeqNo} should be
59+
* set to the last sequence number assigned by this shard, or
60+
* {@link SequenceNumbersService#NO_OPS_PERFORMED} and
61+
* {@code localCheckpoint} should be set to the last known local checkpoint
62+
* for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
63+
*
64+
* @param shardId the shard this service is providing tracking
65+
* local checkpoints for
66+
* @param indexSettings the index settings
67+
* @param maxSeqNo the last sequence number assigned by this shard, or
68+
* {@link SequenceNumbersService#NO_OPS_PERFORMED}
69+
* @param localCheckpoint the last known local checkpoint for this shard, or
70+
* {@link SequenceNumbersService#NO_OPS_PERFORMED}
71+
*/
72+
LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
5973
super(shardId, indexSettings);
74+
if (localCheckpoint < 0 && localCheckpoint != SequenceNumbersService.NO_OPS_PERFORMED) {
75+
throw new IllegalArgumentException(
76+
"local checkpoint must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] "
77+
+ "but was [" + localCheckpoint + "]");
78+
}
79+
if (maxSeqNo < 0 && maxSeqNo != SequenceNumbersService.NO_OPS_PERFORMED) {
80+
throw new IllegalArgumentException(
81+
"max seq. no. must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
82+
}
6083
bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings());
6184
processedSeqNo = new LinkedList<>();
85+
firstProcessedSeqNo = localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1;
86+
this.nextSeqNo = maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
87+
this.checkpoint = localCheckpoint;
6288
}
6389

6490
/**
6591
* issue the next sequence number
6692
**/
67-
public synchronized long generateSeqNo() {
93+
synchronized long generateSeqNo() {
6894
return nextSeqNo++;
6995
}
7096

7197
/**
7298
* marks the processing of the given seqNo have been completed
7399
**/
74-
public synchronized void markSeqNoAsCompleted(long seqNo) {
100+
synchronized void markSeqNoAsCompleted(long seqNo) {
75101
// make sure we track highest seen seqNo
76102
if (seqNo >= nextSeqNo) {
77103
nextSeqNo = seqNo + 1;
@@ -94,7 +120,7 @@ public long getCheckpoint() {
94120
}
95121

96122
/** gets the maximum seqno seen so far */
97-
public long getMaxSeqNo() {
123+
long getMaxSeqNo() {
98124
return nextSeqNo - 1;
99125
}
100126

@@ -130,19 +156,19 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)
130156
*/
131157
private FixedBitSet getBitSetForSeqNo(long seqNo) {
132158
assert Thread.holdsLock(this);
133-
assert seqNo >= firstProcessedSeqNo;
159+
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
134160
int bitSetOffset = ((int) (seqNo - firstProcessedSeqNo)) / bitArraysSize;
135161
while (bitSetOffset >= processedSeqNo.size()) {
136162
processedSeqNo.add(new FixedBitSet(bitArraysSize));
137163
}
138164
return processedSeqNo.get(bitSetOffset);
139165
}
140166

141-
142167
/** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */
143168
private int seqNoToBitSetOffset(long seqNo) {
144169
assert Thread.holdsLock(this);
145170
assert seqNo >= firstProcessedSeqNo;
146171
return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize;
147172
}
173+
148174
}

0 commit comments

Comments
 (0)