Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,23 +154,29 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
throttle = new IndexThrottle();
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
try {
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
final SeqNoStats seqNoStats = loadSeqNoStats(engineConfig, writer);
if (logger.isTraceEnabled()) {
logger.trace(
"recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]",
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
final SeqNoStats seqNoStats;
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
seqNoStats = loadSeqNoStatsFromLuceneAndTranslog(engineConfig.getTranslogConfig(), writer);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
seqNoStats = loadSeqNoStatsLucene(SequenceNumbersService.UNASSIGNED_SEQ_NO, writer);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
seqNoStats = new SeqNoStats(
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.UNASSIGNED_SEQ_NO);
break;
default:
throw new IllegalArgumentException(openMode.toString());
}
seqNoService =
new SequenceNumbersService(
shardId,
engineConfig.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
logger.trace("recovered [{}]", seqNoStats);
indexWriter = writer;
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint);
assert translog.getGeneration() != null;
} catch (IOException | TranslogCorruptedException e) {
Expand Down Expand Up @@ -209,6 +215,18 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
logger.trace("created new InternalEngine");
}

private static SequenceNumbersService sequenceNumberService(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what you get from the crazy syntax 👅

final ShardId shardId,
final IndexSettings indexSettings,
final SeqNoStats seqNoStats) {
return new SequenceNumbersService(
shardId,
indexSettings,
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
}

@Override
public InternalEngine recoverFromTranslog() throws IOException {
flushLock.lock();
Expand Down Expand Up @@ -326,18 +344,33 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
}

/**
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the Translog
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the translog
* checkpoint (global checkpoint).
*
* @param engineConfig the engine configuration (for the open mode and the translog path)
* @param writer the index writer (for the Lucene commit point)
* @param translogConfig the translog config (for the global checkpoint)
* @param indexWriter the index writer (for the Lucene commit point)
* @return the sequence number stats
* @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint
*/
private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final IndexWriter writer) throws IOException {
private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
final TranslogConfig translogConfig,
final IndexWriter indexWriter) throws IOException {
long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath());
return loadSeqNoStatsLucene(globalCheckpoint, indexWriter);
}

/**
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and uses the
* specified global checkpoint.
*
* @param globalCheckpoint the global checkpoint to use
* @param indexWriter the index writer (for the Lucene commit point)
* @return the sequence number stats
*/
private static SeqNoStats loadSeqNoStatsLucene(final long globalCheckpoint, final IndexWriter indexWriter) {
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
Expand All @@ -348,13 +381,6 @@ private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final
}
}

final long globalCheckpoint;
if (engineConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

Expand Down