Skip to content
103 changes: 58 additions & 45 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
Expand All @@ -59,7 +58,6 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand All @@ -86,8 +84,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED;

public class InternalEngine extends Engine {

/**
Expand Down Expand Up @@ -121,6 +117,7 @@ public class InternalEngine extends Engine {
private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no";

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -285,7 +282,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) thr
boolean success = false;
try {
commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? writer.getCommitData().get(SYNC_COMMIT_ID) : null);
? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null);
success = true;
} finally {
if (success == false) {
Expand All @@ -310,7 +307,7 @@ public Translog getTranslog() {
private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) throws IOException {
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
final Map<String, String> commitUserData = writer.getCommitData();
final Map<String, String> commitUserData = commitDataAsMap(writer);
if (commitUserData.containsKey("translog_id")) {
assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false : "legacy commit contains translog UUID";
return new Translog.TranslogGeneration(null, Long.parseLong(commitUserData.get("translog_id")));
Expand All @@ -326,32 +323,20 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
}

private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
final long maxSeqNo;
try (IndexReader reader = DirectoryReader.open(writer)) {
final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
if (stats != null) {
maxSeqNo = (long) stats.getMaxValue();
} else {
maxSeqNo = NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) {
maxSeqNo = Long.parseLong(entry.getValue());
}
}

final Map<String, String> commitUserData = writer.getCommitData();

final long localCheckpoint;
if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY));
} else {
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
}

final long globalCheckpoint;
if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY));
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

Expand Down Expand Up @@ -1323,23 +1308,39 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final Map<String, String> commitData = new HashMap<>(5);

commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);

commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService().getLocalCheckpoint()));
commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService().getGlobalCheckpoint()));

if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}

if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint());

writer.setLiveCommitData(() -> {
/**
* The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated
* *before* Lucene flushes segments, including the local and global checkpoints amongst other values.
* The maximum sequence number is different - we never want the maximum sequence number to be
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk
* of re-using a sequence number for two different documents when restoring from this commit
* point and subsequently writing new documents to the index. Since we only know which Lucene
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
* data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
return commitData.entrySet().iterator();
});

indexWriter.setCommitData(commitData);
writer.commit();
} catch (Exception ex) {
try {
Expand Down Expand Up @@ -1395,7 +1396,8 @@ public MergeStats getMergeStats() {
public SequenceNumbersService seqNoService() {
return seqNoService;
}
@Override

@Override
public DocsStats getDocStats() {
final int numDocs = indexWriter.numDocs();
final int maxDoc = indexWriter.maxDoc();
Expand Down Expand Up @@ -1441,4 +1443,15 @@ boolean indexWriterHasDeletions() {
public boolean isRecovering() {
return pendingTranslogRecovery.get();
}

/**
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(6);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
return commitData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public long generateSeqNo() {
return localCheckpointService.generateSeqNo();
}

/**
* Gets the maximum sequence number seen so far. See {@link LocalCheckpointService#getMaxSeqNo()} for details.
*/
public long getMaxSeqNo() {
return localCheckpointService.getMaxSeqNo();
}

/**
* marks the given seqNo as completed. See {@link LocalCheckpointService#markSeqNoAsCompleted(long)}
* more details
Expand Down
Loading