3535import org .elasticsearch .action .support .ActiveShardCount ;
3636import org .elasticsearch .client .Client ;
3737import org .elasticsearch .cluster .ClusterInfoService ;
38+ import org .elasticsearch .cluster .ClusterState ;
3839import org .elasticsearch .cluster .InternalClusterInfoService ;
3940import org .elasticsearch .cluster .metadata .IndexMetaData ;
4041import org .elasticsearch .cluster .node .DiscoveryNode ;
4142import org .elasticsearch .cluster .routing .Murmur3HashFunction ;
4243import org .elasticsearch .cluster .routing .RoutingTable ;
44+ import org .elasticsearch .cluster .routing .ShardRouting ;
4345import org .elasticsearch .cluster .routing .UnassignedInfo ;
46+ import org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider ;
4447import org .elasticsearch .common .Priority ;
4548import org .elasticsearch .common .collect .ImmutableOpenMap ;
4649import org .elasticsearch .common .settings .Settings ;
6063import java .util .Arrays ;
6164import java .util .Collection ;
6265import java .util .List ;
66+ import java .util .stream .Collectors ;
6367import java .util .stream .IntStream ;
6468
6569import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
@@ -239,41 +243,60 @@ public void testCreateShrinkIndex() {
239243 client ().admin ().cluster ().prepareState ().get ().getState ().nodes ().getDataNodes ();
240244 assertTrue ("at least 2 nodes but was: " + dataNodes .size (), dataNodes .size () >= 2 );
241245 DiscoveryNode [] discoveryNodes = dataNodes .values ().toArray (DiscoveryNode .class );
242- String mergeNode = discoveryNodes [0 ].getName ();
243246 // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
244247 // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
245248 // to the require._name below.
246249 ensureGreen ();
247250 // relocate all shards to one node such that we can merge it.
248251 client ().admin ().indices ().prepareUpdateSettings ("source" )
249252 .setSettings (Settings .builder ()
250- .put ("index.routing.allocation.require._name" , mergeNode )
253+ .put ("index.routing.allocation.require._name" , discoveryNodes [ 0 ]. getName () )
251254 .put ("index.blocks.write" , true )).get ();
252255 ensureGreen ();
253256
254257 final IndicesStatsResponse sourceStats = client ().admin ().indices ().prepareStats ("source" ).setSegments (true ).get ();
255- final long maxSeqNo =
256- Arrays .stream (sourceStats .getShards ()).map (ShardStats ::getSeqNoStats ).mapToLong (SeqNoStats ::getMaxSeqNo ).max ().getAsLong ();
257- final long maxUnsafeAutoIdTimestamp =
258- Arrays .stream (sourceStats .getShards ())
259- .map (ShardStats ::getStats )
260- .map (CommonStats ::getSegments )
261- .mapToLong (SegmentsStats ::getMaxUnsafeAutoIdTimestamp )
262- .max ()
263- .getAsLong ();
264- // now merge source into a single shard index
265258
259+ // disable rebalancing to be able to capture the right stats. balancing can move the target primary
260+ // making it hard to pin point the source shards.
261+ client ().admin ().cluster ().prepareUpdateSettings ().setTransientSettings (Settings .builder ().put (
262+ EnableAllocationDecider .CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey (), "none"
263+ )).get ();
264+
265+
266+ // now merge source into a single shard index
266267 final boolean createWithReplicas = randomBoolean ();
267268 assertAcked (client ().admin ().indices ().prepareShrinkIndex ("source" , "target" )
268269 .setSettings (Settings .builder ().put ("index.number_of_replicas" , createWithReplicas ? 1 : 0 ).build ()).get ());
269270 ensureGreen ();
270271
272+ // resolve true merge node - this is not always the node we required as all shards may be on another node
273+ final ClusterState state = client ().admin ().cluster ().prepareState ().get ().getState ();
274+ DiscoveryNode mergeNode = state .nodes ().get (state .getRoutingTable ().index ("target" ).shard (0 ).primaryShard ().currentNodeId ());
275+ logger .info ("merge node {}" , mergeNode );
276+
277+ final long maxSeqNo = Arrays .stream (sourceStats .getShards ())
278+ .filter (shard -> shard .getShardRouting ().currentNodeId ().equals (mergeNode .getId ()))
279+ .map (ShardStats ::getSeqNoStats ).mapToLong (SeqNoStats ::getMaxSeqNo ).max ().getAsLong ();
280+ final long maxUnsafeAutoIdTimestamp = Arrays .stream (sourceStats .getShards ())
281+ .filter (shard -> shard .getShardRouting ().currentNodeId ().equals (mergeNode .getId ()))
282+ .map (ShardStats ::getStats )
283+ .map (CommonStats ::getSegments )
284+ .mapToLong (SegmentsStats ::getMaxUnsafeAutoIdTimestamp )
285+ .max ()
286+ .getAsLong ();
287+
288+ for (ShardStats shard : Arrays .stream (sourceStats .getShards ()).filter (shard -> shard .getShardRouting ().currentNodeId ().equals (mergeNode .getId ())).collect (Collectors .toList ())) {
289+ logger .info ("used {}, timestamp: {}" , shard .getShardRouting (), shard .getStats ().getSegments ().getMaxUnsafeAutoIdTimestamp ());
290+ }
291+
271292 final IndicesStatsResponse targetStats = client ().admin ().indices ().prepareStats ("target" ).get ();
272293 for (final ShardStats shardStats : targetStats .getShards ()) {
273294 final SeqNoStats seqNoStats = shardStats .getSeqNoStats ();
274- assertThat (seqNoStats .getMaxSeqNo (), equalTo (maxSeqNo ));
275- assertThat (seqNoStats .getLocalCheckpoint (), equalTo (maxSeqNo ));
276- assertThat (shardStats .getStats ().getSegments ().getMaxUnsafeAutoIdTimestamp (), equalTo (maxUnsafeAutoIdTimestamp ));
295+ final ShardRouting shardRouting = shardStats .getShardRouting ();
296+ assertThat ("failed on " + shardRouting , seqNoStats .getMaxSeqNo (), equalTo (maxSeqNo ));
297+ assertThat ("failed on " + shardRouting , seqNoStats .getLocalCheckpoint (), equalTo (maxSeqNo ));
298+ assertThat ("failed on " + shardRouting ,
299+ shardStats .getStats ().getSegments ().getMaxUnsafeAutoIdTimestamp (), equalTo (maxUnsafeAutoIdTimestamp ));
277300 }
278301
279302 final int size = docs > 0 ? 2 * docs : 1 ;
@@ -297,6 +320,11 @@ public void testCreateShrinkIndex() {
297320 assertHitCount (client ().prepareSearch ("source" ).setSize (size ).setQuery (new TermsQueryBuilder ("foo" , "bar" )).get (), docs );
298321 GetSettingsResponse target = client ().admin ().indices ().prepareGetSettings ("target" ).get ();
299322 assertEquals (version , target .getIndexToSettings ().get ("target" ).getAsVersion ("index.version.created" , null ));
323+
324+ // clean up
325+ client ().admin ().cluster ().prepareUpdateSettings ().setTransientSettings (Settings .builder ().put (
326+ EnableAllocationDecider .CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey (), (String )null
327+ )).get ();
300328 }
301329 /**
302330 * Tests that we can manually recover from a failed allocation due to shards being moved away etc.
0 commit comments