Skip to content

Commit 605e802

Browse files
authored
ReadOnlyEngine should update translog recovery state information (#39238)
`ReadOnlyEngine` never recovers operations from translog and never updates translog information in the index shard's recovery state, even though the recovery state goes through the `TRANSLOG` stage during the recovery. It means that recovery information for frozen shards indicates an unkown number of recovered translog ops in the Recovery APIs (translog_ops: `-1` and translog_ops_percent: `-1.0%`) and this is confusing. This commit changes the `recoverFromTranslog()` method in `ReadOnlyEngine` so that it always recover from an empty translog snapshot, allowing the recovery state translog information to be correctly updated. Related to #33888
1 parent 1b1c157 commit 605e802

File tree

3 files changed

+98
-13
lines changed

3 files changed

+98
-13
lines changed

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.Version;
3535
import org.elasticsearch.common.lucene.Lucene;
3636
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
37+
import org.elasticsearch.common.util.concurrent.ReleasableLock;
3738
import org.elasticsearch.core.internal.io.IOUtils;
3839
import org.elasticsearch.index.mapper.MapperService;
3940
import org.elasticsearch.index.seqno.SeqNoStats;
@@ -287,18 +288,7 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
287288

288289
@Override
289290
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
290-
return new Translog.Snapshot() {
291-
@Override
292-
public void close() { }
293-
@Override
294-
public int totalOperations() {
295-
return 0;
296-
}
297-
@Override
298-
public Translog.Operation next() {
299-
return null;
300-
}
301-
};
291+
return newEmptySnapshot();
302292
}
303293

304294
@Override
@@ -429,7 +419,15 @@ public int fillSeqNoGaps(long primaryTerm) {
429419
}
430420

431421
@Override
432-
public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) {
422+
public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryRunner, final long recoverUpToSeqNo) {
423+
try (ReleasableLock lock = readLock.acquire()) {
424+
ensureOpen();
425+
try (Translog.Snapshot snapshot = newEmptySnapshot()) {
426+
translogRecoveryRunner.run(this, snapshot);
427+
} catch (final Exception e) {
428+
throw new EngineException(shardId, "failed to recover from empty translog snapshot", e);
429+
}
430+
}
433431
return this;
434432
}
435433

@@ -468,4 +466,22 @@ protected void processReaders(IndexReader reader, IndexReader previousReader) {
468466
public boolean refreshNeeded() {
469467
return false;
470468
}
469+
470+
private Translog.Snapshot newEmptySnapshot() {
471+
return new Translog.Snapshot() {
472+
@Override
473+
public void close() {
474+
}
475+
476+
@Override
477+
public int totalOperations() {
478+
return 0;
479+
}
480+
481+
@Override
482+
public Translog.Operation next() {
483+
return null;
484+
}
485+
};
486+
}
471487
}

server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,4 +210,36 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
210210
}
211211
}
212212
}
213+
214+
public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
215+
IOUtils.close(engine, store);
216+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
217+
try (Store store = createStore()) {
218+
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
219+
int numDocs = scaledRandomIntBetween(10, 1000);
220+
try (InternalEngine engine = createEngine(config)) {
221+
for (int i = 0; i < numDocs; i++) {
222+
if (rarely()) {
223+
continue; // gap in sequence number
224+
}
225+
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
226+
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
227+
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
228+
if (rarely()) {
229+
engine.flush();
230+
}
231+
globalCheckpoint.set(i);
232+
}
233+
engine.syncTranslog();
234+
engine.flushAndClose();
235+
}
236+
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
237+
final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
238+
readOnlyEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
239+
readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());
240+
241+
assertThat(translogHandler.appliedOperations(), equalTo(0L));
242+
}
243+
}
244+
}
213245
}

x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1010
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1111
import org.elasticsearch.action.delete.DeleteResponse;
12+
import org.elasticsearch.action.index.IndexResponse;
1213
import org.elasticsearch.action.search.SearchRequest;
1314
import org.elasticsearch.action.search.SearchResponse;
1415
import org.elasticsearch.action.search.SearchType;
1516
import org.elasticsearch.action.support.IndicesOptions;
1617
import org.elasticsearch.cluster.block.ClusterBlockException;
1718
import org.elasticsearch.cluster.metadata.IndexMetaData;
19+
import org.elasticsearch.cluster.routing.RecoverySource;
1820
import org.elasticsearch.common.Strings;
1921
import org.elasticsearch.common.settings.Settings;
2022
import org.elasticsearch.common.unit.TimeValue;
@@ -27,6 +29,7 @@
2729
import org.elasticsearch.index.shard.IndexShard;
2830
import org.elasticsearch.index.shard.IndexShardTestCase;
2931
import org.elasticsearch.indices.IndicesService;
32+
import org.elasticsearch.indices.recovery.RecoveryState;
3033
import org.elasticsearch.plugins.Plugin;
3134
import org.elasticsearch.rest.RestStatus;
3235
import org.elasticsearch.search.SearchService;
@@ -49,8 +52,10 @@
4952
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5053
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
5154
import static org.hamcrest.Matchers.equalTo;
55+
import static org.hamcrest.Matchers.greaterThan;
5256
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5357
import static org.hamcrest.Matchers.is;
58+
import static org.hamcrest.Matchers.notNullValue;
5459

5560
public class FrozenIndexTests extends ESSingleNodeTestCase {
5661

@@ -372,4 +377,36 @@ public void testFreezeEmptyIndexWithTranslogOps() throws Exception {
372377
assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
373378
assertIndexFrozen(indexName);
374379
}
380+
381+
public void testRecoveryState() throws ExecutionException, InterruptedException {
382+
final String indexName = "index_recovery_state";
383+
createIndex(indexName, Settings.builder()
384+
.put("index.number_of_replicas", 0)
385+
.build());
386+
387+
final long nbDocs = randomIntBetween(0, 50);
388+
for (long i = 0; i < nbDocs; i++) {
389+
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
390+
assertThat(indexResponse.status(), is(RestStatus.CREATED));
391+
}
392+
393+
assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
394+
assertIndexFrozen(indexName);
395+
396+
final IndexMetaData indexMetaData = client().admin().cluster().prepareState().get().getState().metaData().index(indexName);
397+
final IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(indexMetaData.getIndex());
398+
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
399+
final IndexShard indexShard = indexService.getShardOrNull(i);
400+
assertThat("Shard [" + i + "] is missing for index " + indexMetaData.getIndex(), indexShard, notNullValue());
401+
final RecoveryState recoveryState = indexShard.recoveryState();
402+
assertThat(recoveryState.getRecoverySource(), is(RecoverySource.ExistingStoreRecoverySource.INSTANCE));
403+
assertThat(recoveryState.getStage(), is(RecoveryState.Stage.DONE));
404+
assertThat(recoveryState.getTargetNode(), notNullValue());
405+
assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0));
406+
assertThat(recoveryState.getIndex().reusedFileCount(), greaterThan(0));
407+
assertThat(recoveryState.getTranslog().recoveredOperations(), equalTo(0));
408+
assertThat(recoveryState.getTranslog().totalOperations(), equalTo(0));
409+
assertThat(recoveryState.getTranslog().recoveredPercent(), equalTo(100.0f));
410+
}
411+
}
375412
}

0 commit comments

Comments
 (0)