|
58 | 58 | import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper; |
59 | 59 | import org.elasticsearch.index.merge.MergeStats; |
60 | 60 | import org.elasticsearch.index.merge.OnGoingMerge; |
| 61 | +import org.elasticsearch.index.seqno.SeqNoStats; |
61 | 62 | import org.elasticsearch.index.seqno.SequenceNumbersService; |
62 | 63 | import org.elasticsearch.index.shard.DocsStats; |
63 | 64 | import org.elasticsearch.index.shard.ElasticsearchMergePolicy; |
|
80 | 81 | import java.util.concurrent.locks.ReentrantLock; |
81 | 82 | import java.util.function.Function; |
82 | 83 |
|
| 84 | +import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED; |
| 85 | + |
83 | 86 | /** |
84 | 87 | * |
85 | 88 | */ |
@@ -142,17 +145,21 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { |
142 | 145 | this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); |
143 | 146 | try { |
144 | 147 | writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG); |
145 | | - final long localCheckpoint = loadLocalCheckpointFromCommit(writer); |
146 | | - final long globalCheckpoint = loadGlobalCheckpointFromCommit(writer); |
147 | | - final long maxSeqNo = loadMaxSeqNoFromCommit(writer); |
| 148 | + final SeqNoStats seqNoStats = loadSeqNoStatsFromCommit(writer); |
148 | 149 | if (logger.isTraceEnabled()) { |
149 | 150 | logger.trace( |
150 | | - "recovering local checkpoint: [{}], global checkpoint [{}], max sequence number [{}]", |
151 | | - localCheckpoint, |
152 | | - globalCheckpoint, |
153 | | - maxSeqNo); |
| 151 | + "recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]", |
| 152 | + seqNoStats.getMaxSeqNo(), |
| 153 | + seqNoStats.getLocalCheckpoint(), |
| 154 | + seqNoStats.getGlobalCheckpoint()); |
154 | 155 | } |
155 | | - seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings(), maxSeqNo, localCheckpoint, globalCheckpoint); |
| 156 | + seqNoService = |
| 157 | + new SequenceNumbersService( |
| 158 | + shardId, |
| 159 | + engineConfig.getIndexSettings(), |
| 160 | + seqNoStats.getMaxSeqNo(), |
| 161 | + seqNoStats.getLocalCheckpoint(), |
| 162 | + seqNoStats.getGlobalCheckpoint()); |
156 | 163 | indexWriter = writer; |
157 | 164 | translog = openTranslog(engineConfig, writer); |
158 | 165 | assert translog.getGeneration() != null; |
@@ -303,33 +310,34 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) |
303 | 310 | return null; |
304 | 311 | } |
305 | 312 |
|
306 | | - private long loadLocalCheckpointFromCommit(IndexWriter writer) { |
| 313 | + private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException { |
| 314 | + final long maxSeqNo; |
| 315 | + try (IndexReader reader = DirectoryReader.open(writer)) { |
| 316 | + final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader); |
| 317 | + if (stats != null) { |
| 318 | + maxSeqNo = (long) stats.getMaxValue(); |
| 319 | + } else { |
| 320 | + maxSeqNo = NO_OPS_PERFORMED; |
| 321 | + } |
| 322 | + } |
| 323 | + |
307 | 324 | final Map<String, String> commitUserData = writer.getCommitData(); |
| 325 | + |
| 326 | + final long localCheckpoint; |
308 | 327 | if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) { |
309 | | - return Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY)); |
| 328 | + localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY)); |
310 | 329 | } else { |
311 | | - return SequenceNumbersService.NO_OPS_PERFORMED; |
| 330 | + localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; |
312 | 331 | } |
313 | | - } |
314 | 332 |
|
315 | | - private long loadGlobalCheckpointFromCommit(IndexWriter writer) { |
316 | | - final Map<String, String> commitUserData = writer.getCommitData(); |
| 333 | + final long globalCheckpoint; |
317 | 334 | if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) { |
318 | | - return Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY)); |
| 335 | + globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY)); |
319 | 336 | } else { |
320 | | - return SequenceNumbersService.UNASSIGNED_SEQ_NO; |
| 337 | + globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; |
321 | 338 | } |
322 | | - } |
323 | 339 |
|
324 | | - private long loadMaxSeqNoFromCommit(IndexWriter writer) throws IOException { |
325 | | - try (IndexReader reader = DirectoryReader.open(writer)) { |
326 | | - final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader); |
327 | | - if (stats != null) { |
328 | | - return (long) stats.getMaxValue(); |
329 | | - } else { |
330 | | - return SequenceNumbersService.NO_OPS_PERFORMED; |
331 | | - } |
332 | | - } |
| 340 | + return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); |
333 | 341 | } |
334 | 342 |
|
335 | 343 | private SearcherManager createSearcherManager() throws EngineException { |
|
0 commit comments