Skip to content

Commit 8afe09a

Browse files
authored
Pass TranslogRecoveryRunner to engine from outside (#33449)
This commit allows us to use different TranslogRecoveryRunner when recovering an engine from its local translog. This change is a prerequisite for the commit-based rollback PR. Relates #32867
1 parent 443f9ca commit 8afe09a

File tree

9 files changed

+67
-78
lines changed

9 files changed

+67
-78
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1642,9 +1642,10 @@ public interface Warmer {
16421642
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
16431643
* This operation will close the engine if the recovery fails.
16441644
*
1645-
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
1645+
* @param translogRecoveryRunner the translog recovery runner
1646+
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
16461647
*/
1647-
public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException;
1648+
public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;
16481649

16491650
/**
16501651
* Do not replay translog operations, but make the engine be ready.
@@ -1662,4 +1663,9 @@ public boolean isRecovering() {
16621663
* Tries to prune buffered deletes from the version map.
16631664
*/
16641665
public abstract void maybePruneDeletes();
1666+
1667+
@FunctionalInterface
1668+
public interface TranslogRecoveryRunner {
1669+
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
1670+
}
16651671
}

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,11 @@
3737
import org.elasticsearch.index.mapper.ParsedDocument;
3838
import org.elasticsearch.index.shard.ShardId;
3939
import org.elasticsearch.index.store.Store;
40-
import org.elasticsearch.index.translog.Translog;
4140
import org.elasticsearch.index.translog.TranslogConfig;
4241
import org.elasticsearch.indices.IndexingMemoryController;
4342
import org.elasticsearch.indices.breaker.CircuitBreakerService;
4443
import org.elasticsearch.threadpool.ThreadPool;
4544

46-
import java.io.IOException;
4745
import java.util.List;
4846
import java.util.function.LongSupplier;
4947

@@ -76,7 +74,6 @@ public final class EngineConfig {
7674
private final List<ReferenceManager.RefreshListener> internalRefreshListener;
7775
@Nullable
7876
private final Sort indexSort;
79-
private final TranslogRecoveryRunner translogRecoveryRunner;
8077
@Nullable
8178
private final CircuitBreakerService circuitBreakerService;
8279
private final LongSupplier globalCheckpointSupplier;
@@ -127,9 +124,8 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
127124
TranslogConfig translogConfig, TimeValue flushMergesAfter,
128125
List<ReferenceManager.RefreshListener> externalRefreshListener,
129126
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
130-
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
131-
LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier,
132-
TombstoneDocSupplier tombstoneDocSupplier) {
127+
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
128+
LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier) {
133129
this.shardId = shardId;
134130
this.allocationId = allocationId;
135131
this.indexSettings = indexSettings;
@@ -163,7 +159,6 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
163159
this.externalRefreshListener = externalRefreshListener;
164160
this.internalRefreshListener = internalRefreshListener;
165161
this.indexSort = indexSort;
166-
this.translogRecoveryRunner = translogRecoveryRunner;
167162
this.circuitBreakerService = circuitBreakerService;
168163
this.globalCheckpointSupplier = globalCheckpointSupplier;
169164
this.primaryTermSupplier = primaryTermSupplier;
@@ -324,18 +319,6 @@ public TranslogConfig getTranslogConfig() {
324319
*/
325320
public TimeValue getFlushMergesAfter() { return flushMergesAfter; }
326321

327-
@FunctionalInterface
328-
public interface TranslogRecoveryRunner {
329-
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
330-
}
331-
332-
/**
333-
* Returns a runner that implements the translog recovery from the given snapshot
334-
*/
335-
public TranslogRecoveryRunner getTranslogRecoveryRunner() {
336-
return translogRecoveryRunner;
337-
}
338-
339322
/**
340323
* The refresh listeners to add to Lucene for externally visible refreshes
341324
*/

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -393,15 +393,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
393393
}
394394

395395
@Override
396-
public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException {
396+
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
397397
flushLock.lock();
398398
try (ReleasableLock lock = readLock.acquire()) {
399399
ensureOpen();
400400
if (pendingTranslogRecovery.get() == false) {
401401
throw new IllegalStateException("Engine has already been recovered");
402402
}
403403
try {
404-
recoverFromTranslogInternal(recoverUpToSeqNo);
404+
recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo);
405405
} catch (Exception e) {
406406
try {
407407
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
@@ -423,13 +423,13 @@ public void skipTranslogRecovery() {
423423
pendingTranslogRecovery.set(false); // we are good - now we can commit
424424
}
425425

426-
private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException {
426+
private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
427427
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
428428
final int opsRecovered;
429429
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
430430
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
431431
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
432-
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
432+
opsRecovered = translogRecoveryRunner.run(this, snapshot);
433433
} catch (Exception e) {
434434
throw new EngineException(shardId, "failed to recover from translog", e);
435435
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,7 +1314,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
13141314
**/
13151315
public void openEngineAndRecoverFromTranslog() throws IOException {
13161316
innerOpenEngineAndTranslog();
1317-
getEngine().recoverFromTranslog(Long.MAX_VALUE);
1317+
getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE);
13181318
}
13191319

13201320
/**
@@ -2233,7 +2233,7 @@ private EngineConfig newEngineConfig() {
22332233
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
22342234
Collections.singletonList(refreshListeners),
22352235
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
2236-
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier());
2236+
indexSort, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier());
22372237
}
22382238

22392239
/**

0 commit comments

Comments
 (0)