Skip to content

Commit 9799d71

Browse files
committed
Persist sequence number checkpoints
This commit adds persistence for local and global sequence number checkpoints. We also recover the max sequence number in a shard, although there will be loss here today from delete operations. This will be addressed in a follow-up.
1 parent 601c056 commit 9799d71

File tree

8 files changed

+309
-75
lines changed

8 files changed

+309
-75
lines changed

core/src/main/java/org/elasticsearch/action/fieldstats/FieldStats.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.net.InetAddress;
3737

3838
public abstract class FieldStats<T> implements Writeable, ToXContent {
39+
3940
private final byte type;
4041
private long maxDoc;
4142
private long docCount;
@@ -628,4 +629,5 @@ private final static class Fields {
628629
final static String MAX_VALUE = new String("max_value");
629630
final static String MAX_VALUE_AS_STRING = new String("max_value_as_string");
630631
}
632+
631633
}

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.lucene.util.IOUtils;
4242
import org.apache.lucene.util.InfoStream;
4343
import org.elasticsearch.ExceptionsHelper;
44+
import org.elasticsearch.action.fieldstats.FieldStats;
4445
import org.elasticsearch.common.Nullable;
4546
import org.elasticsearch.common.lease.Releasable;
4647
import org.elasticsearch.common.logging.ESLogger;
@@ -54,8 +55,11 @@
5455
import org.elasticsearch.common.util.concurrent.ReleasableLock;
5556
import org.elasticsearch.index.IndexSettings;
5657
import org.elasticsearch.index.mapper.Uid;
58+
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
5759
import org.elasticsearch.index.merge.MergeStats;
5860
import org.elasticsearch.index.merge.OnGoingMerge;
61+
import org.elasticsearch.index.seqno.GlobalCheckpointService;
62+
import org.elasticsearch.index.seqno.LocalCheckpointService;
5963
import org.elasticsearch.index.seqno.SequenceNumbersService;
6064
import org.elasticsearch.index.shard.DocsStats;
6165
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
@@ -131,12 +135,23 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
131135
boolean success = false;
132136
try {
133137
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
134-
seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings());
138+
135139
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
136140
throttle = new IndexThrottle();
137141
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
138142
try {
139143
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
144+
final long localCheckpoint = loadLocalCheckpointFromCommit(writer);
145+
final long globalCheckpoint = loadGlobalCheckpointFromCommit(writer);
146+
final long maxSeqNo = loadMaxSeqNoFromCommit(writer);
147+
if (logger.isTraceEnabled()) {
148+
logger.trace(
149+
"recovering local checkpoint: [{}], global checkpoint [{}], max sequence number [{}]",
150+
localCheckpoint,
151+
globalCheckpoint,
152+
maxSeqNo);
153+
}
154+
seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings(), maxSeqNo, localCheckpoint, globalCheckpoint);
140155
indexWriter = writer;
141156
translog = openTranslog(engineConfig, writer);
142157
assert translog.getGeneration() != null;
@@ -287,6 +302,35 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
287302
return null;
288303
}
289304

305+
private long loadLocalCheckpointFromCommit(IndexWriter writer) {
306+
final Map<String, String> commitUserData = writer.getCommitData();
307+
if (commitUserData.containsKey(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)) {
308+
return Long.parseLong(commitUserData.get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY));
309+
} else {
310+
return SequenceNumbersService.NO_OPS_PERFORMED;
311+
}
312+
}
313+
314+
private long loadGlobalCheckpointFromCommit(IndexWriter writer) {
315+
final Map<String, String> commitUserData = writer.getCommitData();
316+
if (commitUserData.containsKey(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)) {
317+
return Long.parseLong(commitUserData.get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY));
318+
} else {
319+
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
320+
}
321+
}
322+
323+
private long loadMaxSeqNoFromCommit(IndexWriter writer) throws IOException {
324+
try (IndexReader reader = DirectoryReader.open(writer)) {
325+
final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
326+
if (stats != null) {
327+
return (long) stats.getMaxValue();
328+
} else {
329+
return SequenceNumbersService.NO_OPS_PERFORMED;
330+
}
331+
}
332+
}
333+
290334
private SearcherManager createSearcherManager() throws EngineException {
291335
boolean success = false;
292336
SearcherManager searcherManager = null;
@@ -1132,13 +1176,22 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
11321176
ensureCanFlush();
11331177
try {
11341178
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
1135-
logger.trace("committing writer with translog id [{}] and sync id [{}] ", translogGeneration.translogFileGeneration, syncId);
1136-
Map<String, String> commitData = new HashMap<>(2);
1179+
final Map<String, String> commitData = new HashMap<>(5);
1180+
11371181
commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
11381182
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
1183+
1184+
commitData.put(LocalCheckpointService.LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService.getLocalCheckpoint()));
1185+
commitData.put(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService.getGlobalCheckpoint()));
1186+
11391187
if (syncId != null) {
11401188
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
11411189
}
1190+
1191+
if (logger.isTraceEnabled()) {
1192+
logger.trace("committing writer with commit data [{}]", commitData);
1193+
}
1194+
11421195
indexWriter.setCommitData(commitData);
11431196
writer.commit();
11441197
} catch (Throwable ex) {

core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@
2222
import org.apache.lucene.document.Field;
2323
import org.apache.lucene.document.NumericDocValuesField;
2424
import org.apache.lucene.index.DocValuesType;
25+
import org.apache.lucene.index.IndexReader;
26+
import org.apache.lucene.index.LeafReader;
27+
import org.apache.lucene.index.LeafReaderContext;
28+
import org.apache.lucene.index.NumericDocValues;
2529
import org.apache.lucene.search.Query;
30+
import org.apache.lucene.util.Bits;
31+
import org.elasticsearch.action.fieldstats.FieldStats;
2632
import org.elasticsearch.common.Nullable;
2733
import org.elasticsearch.common.settings.Settings;
2834
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -108,6 +114,34 @@ public Query termQuery(Object value, @Nullable QueryShardContext context) {
108114
throw new QueryShardException(context, "SeqNoField field [" + name() + "] is not searchable");
109115
}
110116

117+
@Override
118+
public FieldStats stats(IndexReader reader) throws IOException {
119+
final List<LeafReaderContext> leaves = reader.leaves();
120+
if (leaves.isEmpty()) {
121+
return null;
122+
}
123+
124+
long currentMin = Long.MAX_VALUE;
125+
long currentMax = Long.MIN_VALUE;
126+
boolean found = false;
127+
for (int i = 0; i < leaves.size(); i++) {
128+
final LeafReader leaf = leaves.get(i).reader();
129+
final NumericDocValues values = leaf.getNumericDocValues(name());
130+
if (values == null) continue;
131+
found = true;
132+
final Bits bits = leaf.getLiveDocs();
133+
for (int docID = 0; docID < leaf.maxDoc(); docID++) {
134+
if (bits == null || bits.get(docID)) {
135+
final long value = values.get(docID);
136+
currentMin = Math.min(currentMin, value);
137+
currentMax = Math.max(currentMax, value);
138+
}
139+
}
140+
}
141+
142+
return found ? new FieldStats.Long(reader.maxDoc(), 0, -1, -1, false, true, currentMin, currentMax) : null;
143+
}
144+
111145
}
112146

113147
public SeqNoFieldMapper(Settings indexSettings) {
@@ -129,7 +163,7 @@ protected void parseCreateField(ParseContext context, List<Field> fields) throws
129163

130164
@Override
131165
public Mapper parse(ParseContext context) throws IOException {
132-
// _seqno added in preparse
166+
// _seq_no added in pre-parse
133167
return null;
134168
}
135169

@@ -157,4 +191,5 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
157191
protected void doMerge(Mapper mergeWith, boolean updateAllTypes) {
158192
// nothing to do
159193
}
194+
160195
}

core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
*/
4242
public class GlobalCheckpointService extends AbstractIndexShardComponent {
4343

44+
public static String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
45+
4446
/**
4547
* This map holds the last known local checkpoint for every shard copy that's active.
4648
* All shard copies in this map participate in determining the global checkpoint
@@ -67,16 +69,20 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
6769
*/
6870
final private ObjectLongMap<String> trackingLocalCheckpoint;
6971

70-
private long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
72+
private long globalCheckpoint;
73+
74+
public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings) {
75+
this(shardId, indexSettings, SequenceNumbersService.UNASSIGNED_SEQ_NO);
76+
}
7177

72-
public GlobalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
78+
public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
7379
super(shardId, indexSettings);
7480
activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
7581
inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
7682
trackingLocalCheckpoint = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
83+
this.globalCheckpoint = globalCheckpoint;
7784
}
7885

79-
8086
/**
8187
* notifies the service of a local checkpoint. if the checkpoint is lower than the currently known one,
8288
* this is a noop. Last, if the allocation id is not yet known, it is ignored. This to prevent late
@@ -241,4 +247,5 @@ synchronized long getLocalCheckpointForAllocation(String allocationId) {
241247
}
242248
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
243249
}
250+
244251
}

core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.index.IndexSettings;
2424
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
2525
import org.elasticsearch.index.shard.ShardId;
26+
import org.elasticsearch.index.shard.SnapshotStatus;
2627

2728
import java.util.LinkedList;
2829

@@ -32,33 +33,41 @@
3233
*/
3334
public class LocalCheckpointService extends AbstractIndexShardComponent {
3435

36+
public static String MAX_SEQ_NO = "max_seq_no";
37+
public static String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
38+
3539
/**
3640
* we keep a bit for each seq No that is still pending. to optimize allocation, we do so in multiple arrays
3741
* allocating them on demand and cleaning up while completed. This setting controls the size of the arrays
3842
*/
3943
public static Setting<Integer> SETTINGS_BIT_ARRAYS_SIZE = Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024,
4044
4, Setting.Property.IndexScope);
4145

42-
4346
/**
4447
* an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstProcessedSeqNo}
4548
* which marks the seqNo the fist bit in the first array corresponds to.
4649
*/
4750
final LinkedList<FixedBitSet> processedSeqNo;
4851
final int bitArraysSize;
49-
long firstProcessedSeqNo = 0;
52+
long firstProcessedSeqNo;
5053

5154
/** the current local checkpoint, i.e., all seqNo lower (&lt;=) than this number have been completed */
52-
volatile long checkpoint = SequenceNumbersService.NO_OPS_PERFORMED;
55+
volatile long checkpoint;
5356

5457
/** the next available seqNo - used for seqNo generation */
55-
volatile long nextSeqNo = 0;
58+
volatile long nextSeqNo;
5659

60+
public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings) {
61+
this(shardId, indexSettings, SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.NO_OPS_PERFORMED);
62+
}
5763

58-
public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
64+
public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long checkpoint) {
5965
super(shardId, indexSettings);
6066
bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings());
6167
processedSeqNo = new LinkedList<>();
68+
firstProcessedSeqNo = checkpoint + 1;
69+
this.nextSeqNo = maxSeqNo + 1;
70+
this.checkpoint = checkpoint;
6271
}
6372

6473
/**
@@ -130,19 +139,19 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)
130139
*/
131140
private FixedBitSet getBitSetForSeqNo(long seqNo) {
132141
assert Thread.holdsLock(this);
133-
assert seqNo >= firstProcessedSeqNo;
142+
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
134143
int bitSetOffset = ((int) (seqNo - firstProcessedSeqNo)) / bitArraysSize;
135144
while (bitSetOffset >= processedSeqNo.size()) {
136145
processedSeqNo.add(new FixedBitSet(bitArraysSize));
137146
}
138147
return processedSeqNo.get(bitSetOffset);
139148
}
140149

141-
142150
/** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */
143151
private int seqNoToBitSetOffset(long seqNo) {
144152
assert Thread.holdsLock(this);
145153
assert seqNo >= firstProcessedSeqNo;
146154
return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize;
147155
}
156+
148157
}

core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,15 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
3434
final LocalCheckpointService localCheckpointService;
3535
final GlobalCheckpointService globalCheckpointService;
3636

37-
public SequenceNumbersService(ShardId shardId, IndexSettings indexSettings) {
37+
public SequenceNumbersService(
38+
final ShardId shardId,
39+
final IndexSettings indexSettings,
40+
final long maxSeqNo,
41+
final long localCheckpoint,
42+
final long globalCheckpoint) {
3843
super(shardId, indexSettings);
39-
localCheckpointService = new LocalCheckpointService(shardId, indexSettings);
40-
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings);
44+
localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint);
45+
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint);
4146
}
4247

4348
/**

0 commit comments

Comments
 (0)