Skip to content

Commit f3b04dc

Browse files
committed
assertSeqNos
1 parent 530addd commit f3b04dc

File tree

4 files changed

+65
-57
lines changed

4 files changed

+65
-57
lines changed

core/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ private void maybeTrimTranslog() {
724724
}
725725
}
726726

727-
private void syncGlobalCheckpoints() {
727+
private void maybeSyncGlobalCheckpoints() {
728728
for (final IndexShard shard : this.shards.values()) {
729729
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
730730
switch (shard.state()) {
@@ -950,7 +950,7 @@ final class AsyncGlobalCheckpointTask extends BaseAsyncTask {
950950

951951
@Override
952952
protected void runInternal() {
953-
indexService.syncGlobalCheckpoints();
953+
indexService.maybeSyncGlobalCheckpoints();
954954
}
955955

956956
@Override

core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ public void testAckedIndexing() throws Exception {
9292
final List<String> nodes = startCluster(rarely() ? 5 : 3);
9393

9494
assertAcked(prepareCreate("test")
95-
.setSettings(Settings.builder()
96-
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
97-
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
98-
));
95+
.setSettings(Settings.builder()
96+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
97+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
98+
));
9999
ensureGreen();
100100

101101
ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme();
@@ -142,8 +142,8 @@ public void testAckedIndexing() throws Exception {
142142
exceptedExceptions.add(e);
143143
final String docId = id;
144144
logger.trace(
145-
(Supplier<?>)
146-
() -> new ParameterizedMessage("[{}] failed id [{}] through node [{}]", name, docId, node), e);
145+
(Supplier<?>)
146+
() -> new ParameterizedMessage("[{}] failed id [{}] through node [{}]", name, docId, node), e);
147147
} finally {
148148
countDownLatchRef.get().countDown();
149149
logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount());
@@ -190,12 +190,12 @@ public void testAckedIndexing() throws Exception {
190190
disruptionScheme.stopDisrupting();
191191
for (String node : internalCluster().getNodeNames()) {
192192
ensureStableCluster(nodes.size(), TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() +
193-
DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
193+
DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
194194
}
195195
// in case of a bridge partition, shard allocation can fail "index.allocation.max_retries" times if the master
196196
// is the super-connected node and recovery source and target are on opposite sides of the bridge
197197
if (disruptionScheme instanceof NetworkDisruption &&
198-
((NetworkDisruption) disruptionScheme).getDisruptedLinks() instanceof Bridge) {
198+
((NetworkDisruption) disruptionScheme).getDisruptedLinks() instanceof Bridge) {
199199
assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true));
200200
}
201201
ensureGreen("test");
@@ -207,14 +207,16 @@ public void testAckedIndexing() throws Exception {
207207
logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size());
208208
for (String id : ackedDocs.keySet()) {
209209
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
210-
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
210+
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
211211
}
212212
} catch (AssertionError | NoShardAvailableActionException e) {
213213
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
214214
}
215215
}
216216
}, 30, TimeUnit.SECONDS);
217217

218+
assertSeqNos();
219+
218220
logger.info("done validating (iteration [{}])", iter);
219221
}
220222
} finally {

core/src/test/java/org/elasticsearch/recovery/RelocationIT.java

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,16 @@
2020
package org.elasticsearch.recovery;
2121

2222
import com.carrotsearch.hppc.IntHashSet;
23-
import com.carrotsearch.hppc.ObjectLongMap;
2423
import com.carrotsearch.hppc.procedures.IntProcedure;
2524
import org.apache.lucene.index.IndexFileNames;
2625
import org.apache.lucene.util.English;
2726
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
28-
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
29-
import org.elasticsearch.action.admin.indices.stats.IndexStats;
30-
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
31-
import org.elasticsearch.action.admin.indices.stats.ShardStats;
3227
import org.elasticsearch.action.index.IndexRequestBuilder;
3328
import org.elasticsearch.action.search.SearchResponse;
3429
import org.elasticsearch.client.Client;
3530
import org.elasticsearch.cluster.ClusterState;
3631
import org.elasticsearch.cluster.metadata.IndexMetaData;
3732
import org.elasticsearch.cluster.node.DiscoveryNode;
38-
import org.elasticsearch.cluster.routing.ShardRouting;
3933
import org.elasticsearch.cluster.routing.ShardRoutingState;
4034
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
4135
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -47,13 +41,10 @@
4741
import org.elasticsearch.common.xcontent.XContentType;
4842
import org.elasticsearch.env.NodeEnvironment;
4943
import org.elasticsearch.index.IndexService;
50-
import org.elasticsearch.index.seqno.SeqNoStats;
51-
import org.elasticsearch.index.seqno.SequenceNumbers;
5244
import org.elasticsearch.index.shard.IndexEventListener;
5345
import org.elasticsearch.index.shard.IndexShard;
5446
import org.elasticsearch.index.shard.IndexShardState;
5547
import org.elasticsearch.index.shard.ShardId;
56-
import org.elasticsearch.indices.IndicesService;
5748
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
5849
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
5950
import org.elasticsearch.plugins.Plugin;
@@ -82,7 +73,6 @@
8273
import java.util.Arrays;
8374
import java.util.Collection;
8475
import java.util.List;
85-
import java.util.Optional;
8676
import java.util.concurrent.CountDownLatch;
8777
import java.util.concurrent.ExecutionException;
8878
import java.util.concurrent.Semaphore;
@@ -111,42 +101,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
111101
@Override
112102
protected void beforeIndexDeletion() throws Exception {
113103
super.beforeIndexDeletion();
114-
assertBusy(() -> {
115-
IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
116-
for (IndexStats indexStats : stats.getIndices().values()) {
117-
for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
118-
Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards())
119-
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
120-
.findFirst();
121-
if (maybePrimary.isPresent() == false) {
122-
continue;
123-
}
124-
ShardStats primary = maybePrimary.get();
125-
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
126-
final ShardRouting primaryShardRouting = primary.getShardRouting();
127-
assertThat(primaryShardRouting + " should have set the global checkpoint",
128-
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
129-
final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId());
130-
final IndicesService indicesService =
131-
internalCluster().getInstance(IndicesService.class, node.getName());
132-
final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId());
133-
final ObjectLongMap<String> globalCheckpoints = indexShard.getInSyncGlobalCheckpoints();
134-
for (ShardStats shardStats : indexShardStats) {
135-
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
136-
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
137-
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
138-
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
139-
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
140-
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
141-
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
142-
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
143-
assertThat(
144-
seqNoStats.getGlobalCheckpoint(),
145-
equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId())));
146-
}
147-
}
148-
}
149-
});
104+
assertSeqNos();
150105
}
151106

152107
public void testSimpleRelocationNoIndexing() {

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.test;
2121

22+
import com.carrotsearch.hppc.ObjectLongMap;
2223
import com.carrotsearch.randomizedtesting.RandomizedContext;
2324
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
2425
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
@@ -49,6 +50,10 @@
4950
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
5051
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
5152
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
53+
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
54+
import org.elasticsearch.action.admin.indices.stats.IndexStats;
55+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
56+
import org.elasticsearch.action.admin.indices.stats.ShardStats;
5257
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
5358
import org.elasticsearch.action.bulk.BulkRequestBuilder;
5459
import org.elasticsearch.action.bulk.BulkResponse;
@@ -69,6 +74,7 @@
6974
import org.elasticsearch.cluster.metadata.IndexMetaData;
7075
import org.elasticsearch.cluster.metadata.MappingMetaData;
7176
import org.elasticsearch.cluster.metadata.MetaData;
77+
import org.elasticsearch.cluster.node.DiscoveryNode;
7278
import org.elasticsearch.cluster.routing.IndexRoutingTable;
7379
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
7480
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -114,6 +120,9 @@
114120
import org.elasticsearch.index.codec.CodecService;
115121
import org.elasticsearch.index.engine.Segment;
116122
import org.elasticsearch.index.mapper.DocumentMapper;
123+
import org.elasticsearch.index.seqno.SeqNoStats;
124+
import org.elasticsearch.index.seqno.SequenceNumbers;
125+
import org.elasticsearch.index.shard.IndexShard;
117126
import org.elasticsearch.index.translog.Translog;
118127
import org.elasticsearch.indices.IndicesQueryCache;
119128
import org.elasticsearch.indices.IndicesRequestCache;
@@ -161,6 +170,7 @@
161170
import java.util.List;
162171
import java.util.Locale;
163172
import java.util.Map;
173+
import java.util.Optional;
164174
import java.util.Random;
165175
import java.util.Set;
166176
import java.util.concurrent.Callable;
@@ -191,6 +201,7 @@
191201
import static org.hamcrest.Matchers.equalTo;
192202
import static org.hamcrest.Matchers.is;
193203
import static org.hamcrest.Matchers.lessThanOrEqualTo;
204+
import static org.hamcrest.Matchers.not;
194205
import static org.hamcrest.Matchers.notNullValue;
195206
import static org.hamcrest.Matchers.startsWith;
196207

@@ -2194,4 +2205,44 @@ public static Index resolveIndex(String index) {
21942205
String uuid = getIndexResponse.getSettings().get(index).get(IndexMetaData.SETTING_INDEX_UUID);
21952206
return new Index(index, uuid);
21962207
}
2208+
2209+
protected void assertSeqNos() throws Exception {
2210+
assertBusy(() -> {
2211+
IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
2212+
for (IndexStats indexStats : stats.getIndices().values()) {
2213+
for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
2214+
Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards())
2215+
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
2216+
.findFirst();
2217+
if (maybePrimary.isPresent() == false) {
2218+
continue;
2219+
}
2220+
ShardStats primary = maybePrimary.get();
2221+
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
2222+
final ShardRouting primaryShardRouting = primary.getShardRouting();
2223+
assertThat(primaryShardRouting + " should have set the global checkpoint",
2224+
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
2225+
final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId());
2226+
final IndicesService indicesService =
2227+
internalCluster().getInstance(IndicesService.class, node.getName());
2228+
final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId());
2229+
final ObjectLongMap<String> globalCheckpoints = indexShard.getInSyncGlobalCheckpoints();
2230+
for (ShardStats shardStats : indexShardStats) {
2231+
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
2232+
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
2233+
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
2234+
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
2235+
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
2236+
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
2237+
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
2238+
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
2239+
assertThat(
2240+
seqNoStats.getGlobalCheckpoint(),
2241+
equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId())));
2242+
}
2243+
}
2244+
}
2245+
});
2246+
}
2247+
21972248
}

0 commit comments

Comments
 (0)