1818 */
1919package org .elasticsearch .indices .state ;
2020
21+ import org .apache .logging .log4j .message .ParameterizedMessage ;
2122import org .elasticsearch .action .admin .cluster .reroute .ClusterRerouteRequest ;
2223import org .elasticsearch .action .support .master .AcknowledgedResponse ;
24+ import org .elasticsearch .cluster .ClusterState ;
2325import org .elasticsearch .cluster .node .DiscoveryNode ;
2426import org .elasticsearch .cluster .routing .IndexRoutingTable ;
2527import org .elasticsearch .cluster .routing .ShardRouting ;
3234import org .elasticsearch .cluster .service .ClusterService ;
3335import org .elasticsearch .common .settings .Settings ;
3436import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
37+ import org .elasticsearch .index .shard .ShardId ;
3538import org .elasticsearch .indices .recovery .PeerRecoverySourceService ;
3639import org .elasticsearch .indices .recovery .StartRecoveryRequest ;
3740import org .elasticsearch .plugins .Plugin ;
3841import org .elasticsearch .test .BackgroundIndexer ;
3942import org .elasticsearch .test .ESIntegTestCase ;
40- import org .elasticsearch .test .junit .annotations .TestLogging ;
4143import org .elasticsearch .test .transport .MockTransportService ;
44+ import org .elasticsearch .test .transport .StubbableTransport ;
4245import org .elasticsearch .transport .TransportService ;
4346
4447import java .util .ArrayList ;
5760import static org .elasticsearch .indices .state .CloseIndexIT .assertIndexIsOpened ;
5861import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
5962import static org .hamcrest .Matchers .greaterThan ;
63+ import static org .hamcrest .Matchers .hasSize ;
6064
6165@ ESIntegTestCase .ClusterScope (minNumDataNodes = 2 )
6266public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
@@ -68,9 +72,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
6872
6973 @ Override
7074 protected Settings nodeSettings (int nodeOrdinal ) {
75+ final int maxRecoveries = Integer .MAX_VALUE ;
7176 return Settings .builder ()
7277 .put (super .nodeSettings (nodeOrdinal ))
73- .put (ThrottlingAllocationDecider .CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey (), Integer .MAX_VALUE )
78+ .put (ThrottlingAllocationDecider .CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey (), maxRecoveries )
79+ .put (ThrottlingAllocationDecider .CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING .getKey (), maxRecoveries )
7480 .put (ConcurrentRebalanceAllocationDecider .CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING .getKey (), -1 )
7581 .build ();
7682 }
@@ -80,8 +86,6 @@ protected int maximumNumberOfShards() {
8086 return 3 ;
8187 }
8288
83- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/38090" )
84- @ TestLogging ("org.elasticsearch.cluster.metadata.MetaDataIndexStateService:DEBUG,org.elasticsearch.action.admin.indices.close:DEBUG" )
8589 public void testCloseWhileRelocatingShards () throws Exception {
8690 final String [] indices = new String [randomIntBetween (3 , 5 )];
8791 final Map <String , Long > docsPerIndex = new HashMap <>();
@@ -120,21 +124,19 @@ public void testCloseWhileRelocatingShards() throws Exception {
120124
121125 final String targetNode = internalCluster ().startDataOnlyNode ();
122126 ensureClusterSizeConsistency (); // wait for the master to finish processing join.
123- final MockTransportService targetTransportService =
124- (MockTransportService ) internalCluster ().getInstance (TransportService .class , targetNode );
125127
126- final Set <String > acknowledgedCloses = ConcurrentCollections .newConcurrentSet ();
127128 try {
128129 final ClusterService clusterService = internalCluster ().getInstance (ClusterService .class , internalCluster ().getMasterName ());
130+ final ClusterState state = clusterService .state ();
129131 final CountDownLatch latch = new CountDownLatch (indices .length );
130- final CountDownLatch release = new CountDownLatch (1 );
132+ final CountDownLatch release = new CountDownLatch (indices . length );
131133
132134 // relocate one shard for every index to be closed
133135 final AllocationCommands commands = new AllocationCommands ();
134136 for (final String index : indices ) {
135137 final NumShards numShards = getNumShards (index );
136138 final int shardId = numShards .numPrimaries == 1 ? 0 : randomIntBetween (0 , numShards .numPrimaries - 1 );
137- final IndexRoutingTable indexRoutingTable = clusterService . state () .routingTable ().index (index );
139+ final IndexRoutingTable indexRoutingTable = state .routingTable ().index (index );
138140
139141 final ShardRouting primary = indexRoutingTable .shard (shardId ).primaryShard ();
140142 assertTrue (primary .started ());
@@ -147,24 +149,49 @@ public void testCloseWhileRelocatingShards() throws Exception {
147149 currentNodeId = replica .currentNodeId ();
148150 }
149151 }
152+ commands .add (new MoveAllocationCommand (index , shardId , state .nodes ().resolveNode (currentNodeId ).getName (), targetNode ));
153+ }
154+
155+ // Build the list of shards for which recoveries will be blocked
156+ final Set <ShardId > blockedShards = commands .commands ().stream ()
157+ .map (c -> (MoveAllocationCommand ) c )
158+ .map (c -> new ShardId (clusterService .state ().metaData ().index (c .index ()).getIndex (), c .shardId ()))
159+ .collect (Collectors .toSet ());
160+ assertThat (blockedShards , hasSize (indices .length ));
161+
162+ final Set <String > acknowledgedCloses = ConcurrentCollections .newConcurrentSet ();
163+ final Set <String > interruptedRecoveries = ConcurrentCollections .newConcurrentSet ();
150164
151- final DiscoveryNode sourceNode = clusterService .state ().nodes ().resolveNode (primary .currentNodeId ());
152- targetTransportService .addSendBehavior (internalCluster ().getInstance (TransportService .class , sourceNode .getName ()),
153- (connection , requestId , action , request , options ) -> {
154- if (PeerRecoverySourceService .Actions .START_RECOVERY .equals (action )) {
155- logger .debug ("blocking recovery of shard {}" , ((StartRecoveryRequest ) request ).shardId ());
156- latch .countDown ();
157- try {
158- release .await ();
159- logger .debug ("releasing recovery of shard {}" , ((StartRecoveryRequest ) request ).shardId ());
160- } catch (InterruptedException e ) {
161- throw new AssertionError (e );
162- }
163- }
164- connection .sendRequest (requestId , action , request , options );
165+ // Create a SendRequestBehavior that will block outgoing start recovery request
166+ final StubbableTransport .SendRequestBehavior sendBehavior = (connection , requestId , action , request , options ) -> {
167+ if (PeerRecoverySourceService .Actions .START_RECOVERY .equals (action )) {
168+ final StartRecoveryRequest startRecoveryRequest = ((StartRecoveryRequest ) request );
169+ if (blockedShards .contains (startRecoveryRequest .shardId ())) {
170+ logger .debug ("blocking recovery of shard {}" , startRecoveryRequest .shardId ());
171+ latch .countDown ();
172+ try {
173+ release .await ();
174+ logger .debug ("releasing recovery of shard {}" , startRecoveryRequest .shardId ());
175+ } catch (final InterruptedException e ) {
176+ logger .warn (() -> new ParameterizedMessage ("exception when releasing recovery of shard {}" ,
177+ startRecoveryRequest .shardId ()), e );
178+ interruptedRecoveries .add (startRecoveryRequest .shardId ().getIndexName ());
179+ Thread .currentThread ().interrupt ();
180+ return ;
165181 }
166- );
167- commands .add (new MoveAllocationCommand (index , shardId , currentNodeId , targetNode ));
182+ }
183+ }
184+ connection .sendRequest (requestId , action , request , options );
185+ };
186+
187+ final MockTransportService targetTransportService =
188+ (MockTransportService ) internalCluster ().getInstance (TransportService .class , targetNode );
189+
190+ for (DiscoveryNode node : state .getNodes ()) {
191+ if (node .isDataNode () && node .getName ().equals (targetNode ) == false ) {
192+ final TransportService sourceTransportService = internalCluster ().getInstance (TransportService .class , node .getName ());
193+ targetTransportService .addSendBehavior (sourceTransportService , sendBehavior );
194+ }
168195 }
169196
170197 assertAcked (client ().admin ().cluster ().reroute (new ClusterRerouteRequest ().commands (commands )).get ());
@@ -223,12 +250,15 @@ public void testCloseWhileRelocatingShards() throws Exception {
223250
224251 targetTransportService .clearAllRules ();
225252
253+ // If a shard recovery has been interrupted, we expect its index to be closed
254+ interruptedRecoveries .forEach (CloseIndexIT ::assertIndexIsClosed );
255+
226256 assertThat ("Consider that the test failed if no indices were successfully closed" , acknowledgedCloses .size (), greaterThan (0 ));
227257 assertAcked (client ().admin ().indices ().prepareOpen ("index-*" ));
228258 ensureGreen (indices );
229259
230260 for (String index : acknowledgedCloses ) {
231- long docsCount = client ().prepareSearch (index ).setSize (0 ).get ().getHits ().getTotalHits ();
261+ long docsCount = client ().prepareSearch (index ).setSize (0 ).setTrackTotalHits ( true ). get ().getHits ().getTotalHits ();
232262 assertEquals ("Expected " + docsPerIndex .get (index ) + " docs in index " + index + " but got " + docsCount
233263 + " (close acknowledged=" + acknowledgedCloses .contains (index ) + ")" , (long ) docsPerIndex .get (index ), docsCount );
234264 }
0 commit comments