2020package org .elasticsearch .recovery ;
2121
2222import com .carrotsearch .hppc .IntHashSet ;
23+ import com .carrotsearch .hppc .cursors .ObjectCursor ;
2324import com .carrotsearch .hppc .procedures .IntProcedure ;
2425import org .apache .lucene .index .IndexFileNames ;
2526import org .apache .lucene .util .English ;
2627import org .elasticsearch .action .ActionFuture ;
2728import org .elasticsearch .action .admin .cluster .health .ClusterHealthResponse ;
2829import org .elasticsearch .action .admin .cluster .reroute .ClusterRerouteResponse ;
30+ import org .elasticsearch .action .admin .indices .stats .ShardStats ;
2931import org .elasticsearch .action .index .IndexRequestBuilder ;
3032import org .elasticsearch .action .index .IndexResponse ;
3133import org .elasticsearch .action .search .SearchResponse ;
4547import org .elasticsearch .common .xcontent .XContentType ;
4648import org .elasticsearch .env .NodeEnvironment ;
4749import org .elasticsearch .index .IndexService ;
50+ import org .elasticsearch .index .IndexSettings ;
51+ import org .elasticsearch .index .seqno .ReplicationTracker ;
52+ import org .elasticsearch .index .seqno .RetentionLease ;
4853import org .elasticsearch .index .shard .IndexEventListener ;
4954import org .elasticsearch .index .shard .IndexShard ;
5055import org .elasticsearch .index .shard .IndexShardState ;
7782import java .util .Arrays ;
7883import java .util .Collection ;
7984import java .util .List ;
85+ import java .util .Map ;
86+ import java .util .Set ;
8087import java .util .concurrent .CountDownLatch ;
8188import java .util .concurrent .Semaphore ;
8289import java .util .concurrent .TimeUnit ;
90+ import java .util .stream .Collectors ;
8391import java .util .stream .Stream ;
8492
8593import static org .elasticsearch .index .query .QueryBuilders .matchAllQuery ;
8896import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertNoFailures ;
8997import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertSearchHits ;
9098import static org .hamcrest .Matchers .equalTo ;
99+ import static org .hamcrest .Matchers .everyItem ;
100+ import static org .hamcrest .Matchers .in ;
91101import static org .hamcrest .Matchers .not ;
92102import static org .hamcrest .Matchers .startsWith ;
93103
@@ -103,6 +113,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
103113 @ Override
104114 protected void beforeIndexDeletion () throws Exception {
105115 super .beforeIndexDeletion ();
116+ assertActiveCopiesEstablishedPeerRecoveryRetentionLeases ();
106117 internalCluster ().assertSeqNos ();
107118 internalCluster ().assertSameDocIdsOnShards ();
108119 }
@@ -603,6 +614,49 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E
603614 assertThat (client ().prepareSearch ("test" ).setSize (0 ).execute ().actionGet ().getHits ().getTotalHits ().value , equalTo (120L ));
604615 }
605616
617+ public void testRelocationEstablishedPeerRecoveryRetentionLeases () throws Exception {
618+ int halfNodes = randomIntBetween (1 , 3 );
619+ String indexName = "test" ;
620+ Settings [] nodeSettings = Stream .concat (
621+ Stream .generate (() -> Settings .builder ().put ("node.attr.color" , "blue" ).build ()).limit (halfNodes ),
622+ Stream .generate (() -> Settings .builder ().put ("node.attr.color" , "red" ).build ()).limit (halfNodes )).toArray (Settings []::new );
623+ List <String > nodes = internalCluster ().startNodes (nodeSettings );
624+ String [] blueNodes = nodes .subList (0 , halfNodes ).toArray (String []::new );
625+ String [] redNodes = nodes .subList (0 , halfNodes ).toArray (String []::new );
626+ ensureStableCluster (halfNodes * 2 );
627+ assertAcked (
628+ client ().admin ().indices ().prepareCreate (indexName ).setSettings (Settings .builder ()
629+ .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), randomBoolean ())
630+ .put (IndexMetaData .SETTING_NUMBER_OF_REPLICAS , randomIntBetween (0 , halfNodes - 1 ))
631+ .put ("index.routing.allocation.include.color" , "blue" )));
632+ ensureGreen ("test" );
633+ assertBusy (() -> assertAllShardsOnNodes (indexName , blueNodes ));
634+ assertActiveCopiesEstablishedPeerRecoveryRetentionLeases ();
635+ client ().admin ().indices ().prepareUpdateSettings (indexName )
636+ .setSettings (Settings .builder ().put ("index.routing.allocation.include.color" , "red" )).get ();
637+ assertBusy (() -> assertAllShardsOnNodes (indexName , redNodes ));
638+ ensureGreen ("test" );
639+ assertActiveCopiesEstablishedPeerRecoveryRetentionLeases ();
640+ }
641+
642+ private void assertActiveCopiesEstablishedPeerRecoveryRetentionLeases () throws Exception {
643+ assertBusy (() -> {
644+ for (ObjectCursor <String > it : client ().admin ().cluster ().prepareState ().get ().getState ().metaData ().indices ().keys ()) {
645+ Map <ShardId , List <ShardStats >> byShardId = Stream .of (client ().admin ().indices ().prepareStats (it .value ).get ().getShards ())
646+ .collect (Collectors .groupingBy (l -> l .getShardRouting ().shardId ()));
647+ for (List <ShardStats > shardStats : byShardId .values ()) {
648+ Set <String > expectedLeaseIds = shardStats .stream ()
649+ .map (s -> ReplicationTracker .getPeerRecoveryRetentionLeaseId (s .getShardRouting ())).collect (Collectors .toSet ());
650+ for (ShardStats shardStat : shardStats ) {
651+ Set <String > actualLeaseIds = shardStat .getRetentionLeaseStats ().retentionLeases ().leases ().stream ()
652+ .map (RetentionLease ::id ).collect (Collectors .toSet ());
653+ assertThat (expectedLeaseIds , everyItem (in (actualLeaseIds )));
654+ }
655+ }
656+ }
657+ });
658+ }
659+
606660 class RecoveryCorruption implements StubbableTransport .SendRequestBehavior {
607661
608662 private final CountDownLatch corruptionCount ;
0 commit comments