Skip to content

Commit af08df8

Browse files
committed
Ensure flush happens on shard idle
This adds 2 testcases that test if a shard goes idle pending (uncommitted) segments are committed and unreferenced files will be freed. Relates to #29482
1 parent ba0eaab commit af08df8

File tree

5 files changed

+163
-12
lines changed

5 files changed

+163
-12
lines changed

server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
2828
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
2929
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
30+
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
31+
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
32+
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
3033
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
3134
import org.elasticsearch.action.admin.indices.stats.CommonStats;
3235
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
@@ -64,6 +67,7 @@
6467
import java.util.Arrays;
6568
import java.util.Collection;
6669
import java.util.List;
70+
import java.util.Map;
6771
import java.util.stream.IntStream;
6872

6973
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -467,4 +471,77 @@ public void testCreateShrinkWithIndexSort() throws Exception {
467471
flushAndRefresh();
468472
assertSortedSegments("target", expectedIndexSort);
469473
}
474+
475+
476+
public void testShrinkCommitsMergeOnIdle() throws Exception {
477+
prepareCreate("source").setSettings(Settings.builder().put(indexSettings())
478+
.put("index.number_of_replicas", 0)
479+
.put("number_of_shards", 5)).get();
480+
for (int i = 0; i < 30; i++) {
481+
client().prepareIndex("source", "type")
482+
.setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
483+
}
484+
client().admin().indices().prepareFlush("source").get();
485+
ImmutableOpenMap<String, DiscoveryNode> dataNodes =
486+
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
487+
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
488+
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
489+
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
490+
// to the require._name below.
491+
ensureGreen();
492+
// relocate all shards to one node such that we can merge it.
493+
client().admin().indices().prepareUpdateSettings("source")
494+
.setSettings(Settings.builder()
495+
.put("index.routing.allocation.require._name", discoveryNodes[0].getName())
496+
.put("index.blocks.write", true)).get();
497+
ensureGreen();
498+
IndicesSegmentResponse sourceStats = client().admin().indices().prepareSegments("source").get();
499+
500+
// disable rebalancing to be able to capture the right stats. balancing can move the target primary
501+
// making it hard to pin point the source shards.
502+
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(
503+
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"
504+
)).get();
505+
506+
// now merge source into a single shard index
507+
assertAcked(client().admin().indices().prepareResizeIndex("source", "target")
508+
.setSettings(Settings.builder().put("index.number_of_replicas", 0).build()).get());
509+
ensureGreen();
510+
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
511+
IndexMetaData target = clusterStateResponse.getState().getMetaData().index("target");
512+
client().admin().indices().prepareForceMerge("target").setMaxNumSegments(1).setFlush(false).get();
513+
IndicesSegmentResponse targetSegStats = client().admin().indices().prepareSegments("target").get();
514+
ShardSegments segmentsStats = targetSegStats.getIndices().get("target").getShards().get(0).getShards()[0];
515+
assertTrue(segmentsStats.getNumberOfCommitted() > 0);
516+
assertNotEquals(segmentsStats.getSegments(), segmentsStats.getNumberOfCommitted());
517+
518+
Iterable<IndicesService> dataNodeInstances = internalCluster().getDataNodeInstances(IndicesService.class);
519+
for (IndicesService service : dataNodeInstances) {
520+
if (service.hasIndex(target.getIndex())) {
521+
IndexService indexShards = service.indexService(target.getIndex());
522+
IndexShard shard = indexShards.getShard(0);
523+
assertTrue(shard.isActive());
524+
shard.checkIdle(0);
525+
assertFalse(shard.isActive());
526+
}
527+
}
528+
assertBusy(() -> {
529+
IndicesSegmentResponse targetStats = client().admin().indices().prepareSegments("target").get();
530+
ShardSegments targetShardSegments = targetStats.getIndices().get("target").getShards().get(0).getShards()[0];
531+
Map<Integer, IndexShardSegments> source = sourceStats.getIndices().get("source").getShards();
532+
int numSourceSegments = 0;
533+
for (IndexShardSegments s : source.values()) {
534+
numSourceSegments += s.getAt(0).getNumberOfCommitted();
535+
}
536+
assertTrue(targetShardSegments.getSegments().size() < numSourceSegments);
537+
assertEquals(targetShardSegments.getNumberOfCommitted(), targetShardSegments.getNumberOfSearch());
538+
assertEquals(targetShardSegments.getNumberOfCommitted(), targetShardSegments.getSegments().size());
539+
assertEquals(1, targetShardSegments.getSegments().size());
540+
});
541+
542+
// clean up
543+
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(
544+
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String)null
545+
)).get();
546+
}
470547
}

server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP
264264
RecoverySource.PeerRecoverySource.INSTANCE);
265265

266266
final IndexShard newReplica =
267-
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {});
267+
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
268268
replicas.add(newReplica);
269269
updateAllocationIDsOnPrimary();
270270
return newReplica;

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,12 @@
7070
import org.elasticsearch.core.internal.io.IOUtils;
7171
import org.elasticsearch.env.NodeEnvironment;
7272
import org.elasticsearch.index.IndexSettings;
73+
import org.elasticsearch.index.MergePolicyConfig;
7374
import org.elasticsearch.index.VersionType;
7475
import org.elasticsearch.index.engine.Engine;
7576
import org.elasticsearch.index.engine.EngineException;
7677
import org.elasticsearch.index.engine.InternalEngine;
78+
import org.elasticsearch.index.engine.Segment;
7779
import org.elasticsearch.index.engine.SegmentsStats;
7880
import org.elasticsearch.index.fielddata.FieldDataStats;
7981
import org.elasticsearch.index.fielddata.IndexFieldData;
@@ -1865,7 +1867,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
18651867
closeShards(shard);
18661868
IndexShard newShard = newShard(
18671869
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
1868-
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {});
1870+
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}, EMPTY_EVENT_LISTENER);
18691871

18701872
recoverShardFromStore(newShard);
18711873

@@ -2011,7 +2013,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
20112013
closeShards(shard);
20122014
IndexShard newShard = newShard(
20132015
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
2014-
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {});
2016+
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}, EMPTY_EVENT_LISTENER);
20152017

20162018
recoverShardFromStore(newShard);
20172019

@@ -2494,7 +2496,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
24942496
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
24952497
.build();
24962498
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
2497-
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer());
2499+
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
24982500

24992501
Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
25002502
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
@@ -2830,4 +2832,74 @@ public void testSegmentMemoryTrackedWithRandomSearchers() throws Exception {
28302832
breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
28312833
assertThat(breaker.getUsed(), equalTo(0L));
28322834
}
2835+
2836+
public void testFlushOnInactive() throws Exception {
2837+
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
2838+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
2839+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
2840+
.build();
2841+
IndexMetaData metaData = IndexMetaData.builder("test")
2842+
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
2843+
.settings(settings)
2844+
.primaryTerm(0, 1).build();
2845+
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId(metaData.getIndex(), 0), "n1", true, ShardRoutingState
2846+
.INITIALIZING, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE);
2847+
final ShardId shardId = shardRouting.shardId();
2848+
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
2849+
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
2850+
AtomicBoolean markedInactive = new AtomicBoolean();
2851+
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
2852+
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, () -> {
2853+
}, new IndexEventListener() {
2854+
@Override
2855+
public void onShardInactive(IndexShard indexShard) {
2856+
markedInactive.set(true);
2857+
primaryRef.get().flush(new FlushRequest());
2858+
}
2859+
});
2860+
primaryRef.set(primary);
2861+
recoverShardFromStore(primary);
2862+
for (int i = 0; i < 3; i++) {
2863+
indexDoc(primary, "test", "" + i, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}");
2864+
primary.refresh("test"); // produce segments
2865+
}
2866+
List<Segment> segments = primary.segments(false);
2867+
Set<String> names = new HashSet<>();
2868+
for (Segment segment : segments) {
2869+
assertFalse(segment.committed);
2870+
assertTrue(segment.search);
2871+
names.add(segment.getName());
2872+
}
2873+
assertEquals(3, segments.size());
2874+
primary.flush(new FlushRequest());
2875+
primary.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(false));
2876+
primary.refresh("test");
2877+
segments = primary.segments(false);
2878+
for (Segment segment : segments) {
2879+
if (names.contains(segment.getName())) {
2880+
assertTrue(segment.committed);
2881+
assertFalse(segment.search);
2882+
} else {
2883+
assertFalse(segment.committed);
2884+
assertTrue(segment.search);
2885+
}
2886+
}
2887+
assertEquals(4, segments.size());
2888+
2889+
assertFalse(markedInactive.get());
2890+
assertBusy(() -> {
2891+
primary.checkIdle(0);
2892+
assertFalse(primary.isActive());
2893+
});
2894+
2895+
assertTrue(markedInactive.get());
2896+
segments = primary.segments(false);
2897+
assertEquals(1, segments.size());
2898+
for (Segment segment : segments) {
2899+
assertTrue(segment.committed);
2900+
assertTrue(segment.search);
2901+
}
2902+
closeShards(primary);
2903+
}
2904+
28332905
}

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException {
9797

9898
// build a new shard using the same store directory as the closed shard
9999
ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(shard.routingEntry(), EXISTING_STORE_INSTANCE);
100-
shard = newShard(shardRouting, shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, null, () -> {});
100+
shard = newShard(shardRouting, shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, null, () -> {},
101+
EMPTY_EVENT_LISTENER);
101102

102103
// restore the shard
103104
recoverShardFromSnapshot(shard, snapshot, repository);

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@
106106
*/
107107
public abstract class IndexShardTestCase extends ESTestCase {
108108

109+
public static final IndexEventListener EMPTY_EVENT_LISTENER = new IndexEventListener() {};
110+
109111
protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() {
110112
@Override
111113
public void onRecoveryDone(RecoveryState state) {
@@ -261,24 +263,25 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
261263
final ShardId shardId = routing.shardId();
262264
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
263265
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
264-
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, listeners);
266+
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer,
267+
EMPTY_EVENT_LISTENER, listeners);
265268
}
266269

267270
/**
268271
* creates a new initializing shard.
269-
*
270-
* @param routing shard routing to use
272+
* @param routing shard routing to use
271273
* @param shardPath path to use for shard data
272274
* @param indexMetaData indexMetaData for the shard, including any mapping
273275
* @param indexSearcherWrapper an optional wrapper to be used during searchers
274276
* @param globalCheckpointSyncer callback for syncing global checkpoints
277+
* @param indexEventListener
275278
* @param listeners an optional set of listeners to add to the shard
276279
*/
277280
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
278281
@Nullable IndexSearcherWrapper indexSearcherWrapper,
279282
@Nullable EngineFactory engineFactory,
280283
Runnable globalCheckpointSyncer,
281-
IndexingOperationListener... listeners) throws IOException {
284+
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
282285
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
283286
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
284287
final IndexShard indexShard;
@@ -290,8 +293,6 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe
290293
indexSettings.getSettings(), "index");
291294
mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_RECOVERY, true);
292295
SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap());
293-
final IndexEventListener indexEventListener = new IndexEventListener() {
294-
};
295296
final Engine.Warmer warmer = searcher -> {
296297
};
297298
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
@@ -336,7 +337,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index
336337
null,
337338
current.engineFactory,
338339
current.getGlobalCheckpointSyncer(),
339-
listeners);
340+
EMPTY_EVENT_LISTENER, listeners);
340341
}
341342

342343
/**

0 commit comments

Comments
 (0)