Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
cluster.health:
wait_for_no_initializing_shards: true
wait_for_events: languid
# Before 8.0, an empty shard has two empty translog files as we used the translog_generation commit tag as the minimum required
# translog generation for recovery. Here we force-flush to have a consistent translog stats for both old and new indices.
- do:
indices.flush:
index: test
force: true
wait_if_ongoing: true
- do:
indices.stats:
metric: [ translog ]
Expand Down Expand Up @@ -37,10 +44,9 @@
- do:
indices.stats:
metric: [ translog ]
# after flushing we have one empty translog file while an empty index before flushing has two empty translog files.
- lt: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.operations: 0 }
- lt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,10 @@ private void updateRetentionPolicy() throws IOException {
assert Thread.holdsLock(this);
logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit));
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);

softDeletesPolicy.setLocalCheckpointOfSafeCommit(
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
}

protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecove
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) {
return translogRecoveryRunner.run(this, snapshot);
}
}
Expand Down Expand Up @@ -458,23 +458,24 @@ public void skipTranslogRecovery() {
}

private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
final long localCheckpoint = getProcessedLocalCheckpoint();
if (localCheckpoint < recoverUpToSeqNo) {
try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
} else {
opsRecovered = 0;
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null :
translogGeneration.translogFileGeneration, translog.currentFileGeneration());
logger.trace("flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
opsRecovered, translog.currentFileGeneration());
commitIndexWriter(indexWriter, translog);
refreshLastCommittedSegmentInfos();
refresh("translog_recovery");
Expand All @@ -486,7 +487,8 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException {

final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier(), persistedSequenceNumberConsumer);
Expand Down Expand Up @@ -551,18 +553,6 @@ public long getWritingBytes() {
return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
}

/**
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromLastCommit() throws IOException {
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}

/**
* Reads the current stored history ID from the IW commit data.
*/
Expand Down Expand Up @@ -1588,8 +1578,10 @@ public boolean shouldPeriodicallyFlush() {
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}
final long localCheckpointOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final long translogGenerationOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
translog.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration;
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
return false;
Expand Down Expand Up @@ -2281,11 +2273,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
ensureCanFlush();
try {
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpointValue = Long.toString(localCheckpoint);

writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
Expand All @@ -2296,10 +2283,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
Expand Down
31 changes: 12 additions & 19 deletions server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -137,31 +138,23 @@ public void trimUnreferencedTranslogFiles() {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
if (commits.size() == 1) {
if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid);

if (minTranslogGeneration < lastCommitGeneration) {
// a translog deletion policy that retains nothing but the last translog generation from safe commit
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);

try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
// refresh the translog stats
this.translogStats = translog.stats();
}
final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
// refresh the translog stats
this.translogStats = translog.stats();
assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen " + translog.currentFileGeneration() + " != min gen " + translog.getMinFileGeneration();
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,10 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);

final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(), seqNo -> {})
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,10 +1449,7 @@ public void associateIndexWithNewTranslog(final String translogUUID) throws IOEx
if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) {
throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]");
}
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
updateCommitData(writer, map);
updateCommitData(writer, Map.of(Translog.TRANSLOG_UUID_KEY, translogUUID));
} finally {
metadataLock.writeLock().unlock();
}
Expand Down
Loading