From 6e5429fbfaef31651dbfe71c36074e5d7ddfe40b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 2 Dec 2016 06:23:58 -0500 Subject: [PATCH 1/2] Clarify global checkpoint recovery Today when starting a new engine, we read the global checkpoint from the translog only if we are opening an existing translog. This commit clarifies this situation by distinguishing the three cases of engine creation in the constructor leading to clearer code. --- .../index/engine/InternalEngine.java | 84 +++++++++++++------ 1 file changed, 57 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a361a53450c71..08e30b7da2f1d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -154,23 +154,33 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { throttle = new IndexThrottle(); this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); try { - writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG); - final SeqNoStats seqNoStats = loadSeqNoStats(engineConfig, writer); - if (logger.isTraceEnabled()) { - logger.trace( - "recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]", - seqNoStats.getMaxSeqNo(), - seqNoStats.getLocalCheckpoint(), - seqNoStats.getGlobalCheckpoint()); + final SeqNoStats seqNoStats; + switch (openMode) { + case OPEN_INDEX_AND_TRANSLOG: + writer = createWriter(false); + seqNoStats = loadSeqNoStatsFromLuceneAndTranslog(engineConfig.getTranslogConfig(), writer); + break; + case OPEN_INDEX_CREATE_TRANSLOG: + writer = createWriter(false); + seqNoStats = loadSeqNoStatsLucene(SequenceNumbersService.UNASSIGNED_SEQ_NO, writer); + break; + case CREATE_INDEX_AND_TRANSLOG: + writer = createWriter(true); + seqNoStats = new SeqNoStats( + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.UNASSIGNED_SEQ_NO); + break; + default: + throw new IllegalArgumentException(openMode.toString()); } - seqNoService = - new SequenceNumbersService( - shardId, - engineConfig.getIndexSettings(), - seqNoStats.getMaxSeqNo(), - seqNoStats.getLocalCheckpoint(), - seqNoStats.getGlobalCheckpoint()); + logger.trace( + "recovered max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]", + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()); indexWriter = writer; + seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint); assert translog.getGeneration() != null; } catch (IOException | TranslogCorruptedException e) { @@ -209,6 +219,18 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { logger.trace("created new InternalEngine"); } + private static SequenceNumbersService sequenceNumberService( + final ShardId shardId, + final IndexSettings indexSettings, + final SeqNoStats seqNoStats) { + return new SequenceNumbersService( + shardId, + indexSettings, + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()); + } + @Override public InternalEngine recoverFromTranslog() throws IOException { flushLock.lock(); @@ -326,18 +348,33 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) } /** - * Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the Translog + * Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the translog * checkpoint (global checkpoint). * - * @param engineConfig the engine configuration (for the open mode and the translog path) - * @param writer the index writer (for the Lucene commit point) + * @param translogConfig the translog config (for the global checkpoint) + * @param indexWriter the index writer (for the Lucene commit point) * @return the sequence number stats * @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint */ - private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final IndexWriter writer) throws IOException { + private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog( + final TranslogConfig translogConfig, + final IndexWriter indexWriter) throws IOException { + long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()); + return loadSeqNoStatsLucene(globalCheckpoint, indexWriter); + } + + /** + * Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and uses the + * specified global checkpoint. + * + * @param globalCheckpoint the global checkpoint to use + * @param indexWriter the index writer (for the Lucene commit point) + * @return the sequence number stats + */ + private static SeqNoStats loadSeqNoStatsLucene(final long globalCheckpoint, final IndexWriter indexWriter) { long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; - for (Map.Entry entry : writer.getLiveCommitData()) { + for (Map.Entry entry : indexWriter.getLiveCommitData()) { final String key = entry.getKey(); if (key.equals(LOCAL_CHECKPOINT_KEY)) { assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED; @@ -348,13 +385,6 @@ private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final } } - final long globalCheckpoint; - if (engineConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); - } else { - globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; - } - return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); } From 7a48c392ca221b952c5ccdb0e15c1eed94308bd1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 2 Dec 2016 14:40:47 -0500 Subject: [PATCH 2/2] Simplify trace logging statement in InternalEngine In the constructor for the internal engine, we log a trace statement showing the recovered max sequence number, local checkpoint, and global checkpoint. This commit simplifies this logging statement to just use the fact that SeqNoStats implements toString. --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 08e30b7da2f1d..90e9616ef261f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -174,11 +174,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { default: throw new IllegalArgumentException(openMode.toString()); } - logger.trace( - "recovered max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]", - seqNoStats.getMaxSeqNo(), - seqNoStats.getLocalCheckpoint(), - seqNoStats.getGlobalCheckpoint()); + logger.trace("recovered [{}]", seqNoStats); indexWriter = writer; seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint);