Skip to content

Commit b977f01

Browse files
authored
Expose translog stats in ReadOnlyEngine (#43752) (#43823)
Backport of #43752 for 7.x.
1 parent c8ed271 commit b977f01

File tree

7 files changed

+253
-4
lines changed

7 files changed

+253
-4
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,47 @@ setup:
7979
indices.stats:
8080
metric: [ translog ]
8181
- gte: { indices.test.primaries.translog.earliest_last_modified_age: 0 }
82+
83+
---
84+
"Translog stats on closed indices":
85+
- skip:
86+
version: " - 7.2.99"
87+
reason: "closed indices have translog stats starting version 7.3.0"
88+
89+
- do:
90+
index:
91+
index: test
92+
id: 1
93+
body: { "foo": "bar" }
94+
95+
- do:
96+
index:
97+
index: test
98+
id: 2
99+
body: { "foo": "bar" }
100+
101+
- do:
102+
index:
103+
index: test
104+
id: 3
105+
body: { "foo": "bar" }
106+
107+
- do:
108+
indices.stats:
109+
metric: [ translog ]
110+
- match: { indices.test.primaries.translog.operations: 3 }
111+
- match: { indices.test.primaries.translog.uncommitted_operations: 3 }
112+
113+
- do:
114+
indices.close:
115+
index: test
116+
wait_for_active_shards: 1
117+
- is_true: acknowledged
118+
119+
- do:
120+
indices.stats:
121+
metric: [ translog ]
122+
expand_wildcards: all
123+
forbid_closed_indices: false
124+
- match: { indices.test.primaries.translog.operations: 3 }
125+
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import org.elasticsearch.index.shard.DocsStats;
4343
import org.elasticsearch.index.store.Store;
4444
import org.elasticsearch.index.translog.Translog;
45+
import org.elasticsearch.index.translog.TranslogConfig;
46+
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
4547
import org.elasticsearch.index.translog.TranslogStats;
4648

4749
import java.io.Closeable;
@@ -108,7 +110,6 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
108110
// yet this makes sure nobody else does. including some testing tools that try to be messy
109111
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
110112
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
111-
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
112113
if (seqNoStats == null) {
113114
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
114115
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
@@ -119,6 +120,8 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
119120
reader = wrapReader(reader, readerWrapperFunction);
120121
searcherManager = new SearcherManager(reader, searcherFactory);
121122
this.docsStats = docsStats(lastCommittedSegmentInfos);
123+
assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time";
124+
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
122125
this.indexWriterLock = indexWriterLock;
123126
success = true;
124127
} finally {
@@ -216,6 +219,26 @@ private static SeqNoStats buildSeqNoStats(EngineConfig config, SegmentInfos info
216219
return new SeqNoStats(maxSeqNo, localCheckpoint, config.getGlobalCheckpointSupplier().getAsLong());
217220
}
218221

222+
private static TranslogStats translogStats(final EngineConfig config, final SegmentInfos infos) throws IOException {
223+
final String translogUuid = infos.getUserData().get(Translog.TRANSLOG_UUID_KEY);
224+
if (translogUuid == null) {
225+
throw new IllegalStateException("commit doesn't contain translog unique id");
226+
}
227+
final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
228+
final TranslogConfig translogConfig = config.getTranslogConfig();
229+
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
230+
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
231+
config.getIndexSettings().getTranslogRetentionAge().getMillis()
232+
);
233+
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);
234+
235+
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
236+
config.getPrimaryTermSupplier(), seqNo -> {})
237+
) {
238+
return translog.stats();
239+
}
240+
}
241+
219242
@Override
220243
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
221244
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);

server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.index.seqno.SeqNoStats;
3030
import org.elasticsearch.index.seqno.SequenceNumbers;
3131
import org.elasticsearch.index.store.Store;
32+
import org.elasticsearch.index.translog.TranslogStats;
3233

3334
import java.io.IOException;
3435
import java.util.List;
@@ -37,6 +38,7 @@
3738

3839
import static org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader.getElasticsearchDirectoryReader;
3940
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.greaterThan;
4042
import static org.hamcrest.Matchers.instanceOf;
4143

4244
public class ReadOnlyEngineTests extends EngineTestCase {
@@ -183,7 +185,7 @@ public void testReadOnly() throws IOException {
183185
try (Store store = createStore()) {
184186
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
185187
store.createEmpty(Version.CURRENT.luceneVersion);
186-
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
188+
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
187189
Class<? extends Throwable> expectedException = LuceneTestCase.TEST_ASSERTS_ENABLED ? AssertionError.class :
188190
UnsupportedOperationException.class;
189191
expectThrows(expectedException, () -> readOnlyEngine.index(null));
@@ -204,7 +206,7 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
204206
try (Store store = createStore()) {
205207
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
206208
store.createEmpty(Version.CURRENT.luceneVersion);
207-
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
209+
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
208210
globalCheckpoint.set(randomNonNegativeLong());
209211
try {
210212
readOnlyEngine.verifyEngineBeforeIndexClosing();
@@ -242,4 +244,46 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
242244
}
243245
}
244246
}
247+
248+
public void testTranslogStats() throws IOException {
249+
IOUtils.close(engine, store);
250+
try (Store store = createStore()) {
251+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
252+
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
253+
254+
final int numDocs = frequently() ? scaledRandomIntBetween(10, 200) : 0;
255+
int uncommittedDocs = 0;
256+
257+
try (InternalEngine engine = createEngine(config)) {
258+
for (int i = 0; i < numDocs; i++) {
259+
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
260+
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
261+
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
262+
if (rarely()) {
263+
engine.flush();
264+
uncommittedDocs = 0;
265+
} else {
266+
uncommittedDocs += 1;
267+
}
268+
globalCheckpoint.set(i);
269+
}
270+
271+
assertThat(engine.getTranslogStats().estimatedNumberOfOperations(), equalTo(numDocs));
272+
assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs));
273+
assertThat(engine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
274+
assertThat(engine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
275+
assertThat(engine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));
276+
277+
engine.flush(true, true);
278+
}
279+
280+
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity())) {
281+
assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(numDocs));
282+
assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
283+
assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
284+
assertThat(readOnlyEngine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
285+
assertThat(readOnlyEngine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));
286+
}
287+
}
288+
}
245289
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4083,7 +4083,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
40834083
ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true,
40844084
ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE);
40854085
final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting,
4086-
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, false, Function.identity()) {
4086+
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity()) {
40874087
@Override
40884088
protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
40894089
// just like a following shard, we need to skip this check for now.

server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
import org.elasticsearch.action.ActionRequestValidationException;
2323
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
2424
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
25+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
2526
import org.elasticsearch.action.index.IndexRequestBuilder;
27+
import org.elasticsearch.action.index.IndexResponse;
2628
import org.elasticsearch.action.search.SearchResponse;
29+
import org.elasticsearch.action.support.ActiveShardCount;
2730
import org.elasticsearch.action.support.IndicesOptions;
2831
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2932
import org.elasticsearch.client.Client;
@@ -34,6 +37,7 @@
3437
import org.elasticsearch.common.xcontent.XContentType;
3538
import org.elasticsearch.index.IndexNotFoundException;
3639
import org.elasticsearch.index.query.QueryBuilders;
40+
import org.elasticsearch.rest.RestStatus;
3741
import org.elasticsearch.test.ESIntegTestCase;
3842

3943
import java.io.IOException;
@@ -54,6 +58,7 @@
5458
import static org.hamcrest.Matchers.containsString;
5559
import static org.hamcrest.Matchers.equalTo;
5660
import static org.hamcrest.Matchers.is;
61+
import static org.hamcrest.Matchers.notNullValue;
5762

5863
public class OpenCloseIndexIT extends ESIntegTestCase {
5964
public void testSimpleCloseOpen() {
@@ -346,4 +351,38 @@ public void testOpenCloseIndexWithBlocks() {
346351
}
347352
}
348353
}
354+
355+
public void testTranslogStats() {
356+
final String indexName = "test";
357+
createIndex(indexName, Settings.builder()
358+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
359+
.build());
360+
361+
final int nbDocs = randomIntBetween(0, 50);
362+
int uncommittedOps = 0;
363+
for (long i = 0; i < nbDocs; i++) {
364+
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
365+
assertThat(indexResponse.status(), is(RestStatus.CREATED));
366+
367+
if (rarely()) {
368+
client().admin().indices().prepareFlush(indexName).get();
369+
uncommittedOps = 0;
370+
} else {
371+
uncommittedOps += 1;
372+
}
373+
}
374+
375+
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().setTranslog(true).get();
376+
assertThat(stats.getIndex(indexName), notNullValue());
377+
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
378+
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(uncommittedOps));
379+
380+
assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(ActiveShardCount.ONE));
381+
382+
IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN_CLOSED;
383+
stats = client().admin().indices().prepareStats(indexName).setIndicesOptions(indicesOptions).clear().setTranslog(true).get();
384+
assertThat(stats.getIndex(indexName), notNullValue());
385+
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
386+
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(0));
387+
}
349388
}

x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,4 +409,39 @@ public void testRecoveryState() throws ExecutionException, InterruptedException
409409
assertThat(recoveryState.getTranslog().recoveredPercent(), equalTo(100.0f));
410410
}
411411
}
412+
413+
public void testTranslogStats() throws Exception {
414+
final String indexName = "test";
415+
createIndex(indexName, Settings.builder()
416+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
417+
.build());
418+
419+
final int nbDocs = randomIntBetween(0, 50);
420+
int uncommittedOps = 0;
421+
for (long i = 0; i < nbDocs; i++) {
422+
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
423+
assertThat(indexResponse.status(), is(RestStatus.CREATED));
424+
425+
if (rarely()) {
426+
client().admin().indices().prepareFlush(indexName).get();
427+
uncommittedOps = 0;
428+
} else {
429+
uncommittedOps += 1;
430+
}
431+
}
432+
433+
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().setTranslog(true).get();
434+
assertThat(stats.getIndex(indexName), notNullValue());
435+
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
436+
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(uncommittedOps));
437+
438+
assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
439+
assertIndexFrozen(indexName);
440+
441+
IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN_CLOSED;
442+
stats = client().admin().indices().prepareStats(indexName).setIndicesOptions(indicesOptions).clear().setTranslog(true).get();
443+
assertThat(stats.getIndex(indexName), notNullValue());
444+
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
445+
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(0));
446+
}
412447
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
---
2+
setup:
3+
- do:
4+
indices.create:
5+
index: test
6+
- do:
7+
cluster.health:
8+
wait_for_no_initializing_shards: true
9+
10+
---
11+
"Translog stats on frozen indices":
12+
- skip:
13+
version: " - 7.2.99"
14+
reason: "frozen indices have translog stats starting version 7.3.0"
15+
16+
- do:
17+
index:
18+
index: test
19+
id: 1
20+
body: { "foo": "bar" }
21+
22+
- do:
23+
index:
24+
index: test
25+
id: 2
26+
body: { "foo": "bar" }
27+
28+
- do:
29+
index:
30+
index: test
31+
id: 3
32+
body: { "foo": "bar" }
33+
34+
- do:
35+
indices.stats:
36+
metric: [ translog ]
37+
- match: { indices.test.primaries.translog.operations: 3 }
38+
- match: { indices.test.primaries.translog.uncommitted_operations: 3 }
39+
40+
# freeze index
41+
- do:
42+
indices.freeze:
43+
index: test
44+
wait_for_active_shards: 1
45+
- is_true: acknowledged
46+
47+
- do:
48+
indices.stats:
49+
metric: [ translog ]
50+
- match: { indices.test.primaries.translog.operations: 3 }
51+
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }
52+
53+
# unfreeze index
54+
- do:
55+
indices.freeze:
56+
index: test
57+
wait_for_active_shards: 1
58+
- is_true: acknowledged
59+
60+
- do:
61+
indices.stats:
62+
metric: [ translog ]
63+
- match: { indices.test.primaries.translog.operations: 3 }
64+
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

0 commit comments

Comments
 (0)